001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.planning.plugin.asset;
028:
029: import java.util.Collection;
030: import java.util.Enumeration;
031: import java.util.Iterator;
032:
033: import org.cougaar.core.blackboard.AnonymousChangeReport;
034: import org.cougaar.core.blackboard.ChangeReport;
035: import org.cougaar.core.blackboard.IncrementalSubscription;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.core.plugin.ComponentPlugin;
038: import org.cougaar.core.service.LoggingService;
039: import org.cougaar.core.service.DomainService;
040: import org.cougaar.planning.Constants;
041: import org.cougaar.planning.ldm.PlanningFactory;
042: import org.cougaar.planning.ldm.asset.Asset;
043: import org.cougaar.planning.ldm.asset.LocalPG;
044: import org.cougaar.planning.ldm.plan.Allocation;
045: import org.cougaar.planning.ldm.plan.AllocationResult;
046: import org.cougaar.planning.ldm.plan.AspectType;
047: import org.cougaar.planning.ldm.plan.AssetTransfer;
048: import org.cougaar.planning.ldm.plan.HasRelationships;
049: import org.cougaar.planning.ldm.plan.NewSchedule;
050: import org.cougaar.planning.ldm.plan.PlanElement;
051: import org.cougaar.planning.ldm.plan.PrepositionalPhrase;
052: import org.cougaar.planning.ldm.plan.Relationship;
053: import org.cougaar.planning.ldm.plan.RelationshipSchedule;
054: import org.cougaar.planning.ldm.plan.Role;
055: import org.cougaar.planning.ldm.plan.Task;
056: import org.cougaar.planning.plugin.util.PluginHelper;
057: import org.cougaar.util.UnaryPredicate;
058:
059: /**
060: * AssetReportPlugin manages REPORTFORDUTY and REPORTFORSERVICE relationships
061: * Handles both expansion and allocation of these tasks.
062: * This plugin sees the tasks created by the AssetDataPlugin, and allocates
063: * them correctly so that the LPs force the Asset transfers.
064: */
065: public class AssetReportPlugin extends ComponentPlugin {
066: protected PlanningFactory myPlanningFactory;
067: private IncrementalSubscription myTasks;
068: private IncrementalSubscription myAssetTransfers;
069:
070: private IncrementalSubscription myLocalAssets;
071:
072: protected LoggingService myLogger;
073:
074: protected void setupSubscriptions() {
075: DomainService ds = (DomainService) getServiceBroker()
076: .getService(this , DomainService.class, null);
077: myPlanningFactory = (PlanningFactory) ds.getFactory("planning");
078: getServiceBroker()
079: .releaseService(this , DomainService.class, ds);
080: if (myPlanningFactory == null) {
081: throw new RuntimeException("Missing \"planning\" factory");
082: }
083:
084: myLogger = (LoggingService) getServiceBroker().getService(this ,
085: LoggingService.class, null);
086: if (myLogger == null) {
087: myLogger = LoggingService.NULL;
088: }
089:
090: // subscribe for incoming Report Tasks
091: myTasks = (IncrementalSubscription) getBlackboardService()
092: .subscribe(getTaskPredicate());
093:
094: // subscribe to my allocations in order to propagate allocationresults
095: myAssetTransfers = (IncrementalSubscription) getBlackboardService()
096: .subscribe(getAssetTransferPred());
097:
098: // subscribe to my local assets so I can propagate modifications
099: myLocalAssets = (IncrementalSubscription) getBlackboardService()
100: .subscribe(getLocalAssetPred());
101: }
102:
103: public void execute() {
104: // Handle REPORT tasks, expanding and allocating all at once
105: if (myTasks.hasChanged()) {
106: if (myLogger.isInfoEnabled())
107: myLogger.info(getAgentIdentifier()
108: + " had RFD or RFS task subscription fire");
109: Enumeration newtasks = myTasks.getAddedList();
110: while (newtasks.hasMoreElements()) {
111: Task currentTask = (Task) newtasks.nextElement();
112: if (myLogger.isInfoEnabled())
113: myLogger
114: .info(" with added Task to now allocate: "
115: + currentTask);
116: allocate(currentTask);
117: }
118: }
119:
120: // If get back a reported result, automatically send it up.
121: if (myAssetTransfers.hasChanged()) {
122: if (myLogger.isInfoEnabled())
123: myLogger
124: .info(getAgentIdentifier()
125: + " had AssetTransfer with RFD or RFS task subscription fire");
126: Enumeration changedallocs = myAssetTransfers
127: .getChangedList();
128: boolean didLog = false;
129: int notSent = 0;
130: PlanElement cpe = null;
131: while (changedallocs.hasMoreElements()) {
132: cpe = (PlanElement) changedallocs.nextElement();
133: if (PluginHelper.updatePlanElement(cpe)) {
134: if (myLogger.isInfoEnabled()) {
135: myLogger
136: .info(" with a changed PE to propagate up: "
137: + cpe);
138: didLog = true;
139: }
140: getBlackboardService().publishChange(cpe);
141: } else if (myLogger.isInfoEnabled()) {
142: notSent++;
143: }
144: }
145: if (!didLog && myLogger.isInfoEnabled())
146: myLogger
147: .info(" with "
148: + notSent
149: + " changed ATs but no PE changes to propagate. Last PE was: "
150: + cpe);
151: }
152:
153: if (myLocalAssets.hasChanged()) {
154: if (myLogger.isInfoEnabled())
155: myLogger
156: .info(getAgentIdentifier()
157: + " had Local HasRelationship asset subscription fire -- will resend AssetTransfers if the ChangeReport is not a RelationshipSchedule change");
158: resendAssetTransfers();
159: }
160: }
161:
162: /**
163: * getTaskPredicate - returns task predicate for task subscription
164: * Default implementation subscribes to all non-internal tasks. Derived classes
165: * should probably implement a more specific version.
166: *
167: * @return UnaryPredicate - task predicate to be used.
168: */
169: protected UnaryPredicate getTaskPredicate() {
170: return allReportTaskPred;
171: }
172:
173: protected UnaryPredicate getAssetTransferPred() {
174: return allReportAssetTransferPred;
175: }
176:
177: protected UnaryPredicate getLocalAssetPred() {
178: return new SelfOrgPredicate(getAgentIdentifier());
179: }
180:
181: private void allocate(Task task) {
182: if (task.getPlanElement() != null) {
183: myLogger.error(getAgentIdentifier().toString()
184: + "/AssetReportPlugin: unable to process "
185: + task.getUID()
186: + " - task already has a PlanElement - "
187: + task.getPlanElement() + ".\n");
188: return;
189: }
190:
191: Asset reportingAsset = task.getDirectObject();
192:
193: if (!reportingAsset.getClusterPG().getMessageAddress().equals(
194: getAgentIdentifier())) {
195: allocateRemote(task);
196: } else {
197: allocateLocal(task);
198: }
199: }
200:
201: private void allocateLocal(Task task) {
202: Asset reportingAsset = task.getDirectObject();
203: Asset reportee = (Asset) findIndirectObject(task,
204: Constants.Preposition.FOR);
205:
206: Asset localReportingAsset = findLocalAsset(reportingAsset);
207: if ((localReportingAsset == null)
208: || (!((HasRelationships) localReportingAsset).isLocal())) {
209: //(!localReportingAsset.getClusterPG().getMessageAddress().equals(getAgentIdentifier()))) {
210: myLogger.error(getAgentIdentifier().toString()
211: + "/AssetReportPlugin: unable to process "
212: + task.getVerb() + " task - " + reportingAsset
213: + " reporting to " + reportee + ".\n"
214: + reportingAsset + " not local to this agent.");
215: return;
216: }
217:
218: long startTime = (long) task
219: .getPreferredValue(AspectType.START_TIME);
220: long endTime = (long) task
221: .getPreferredValue(AspectType.END_TIME);
222:
223: // Make RelationshipSchedule for the reporting asset
224: Collection roles = (Collection) findIndirectObject(task,
225: Constants.Preposition.AS);
226: RelationshipSchedule schedule = myPlanningFactory
227: .newRelationshipSchedule((HasRelationships) reportingAsset);
228: for (Iterator iterator = roles.iterator(); iterator.hasNext();) {
229: Relationship relationship = myPlanningFactory
230: .newRelationship((Role) iterator.next(),
231: (HasRelationships) reportingAsset,
232: (HasRelationships) reportee, startTime,
233: endTime);
234: schedule.add(relationship);
235: }
236: ((HasRelationships) reportingAsset)
237: .setRelationshipSchedule(schedule);
238:
239: // create the transfer
240: NewSchedule availSchedule = myPlanningFactory
241: .newSimpleSchedule(startTime, endTime);
242:
243: AllocationResult newEstimatedResult = PluginHelper
244: .createEstimatedAllocationResult(task,
245: myPlanningFactory, 1.0, true);
246:
247: AssetTransfer assetTransfer = myPlanningFactory
248: .createAssetTransfer(task.getPlan(), task,
249: reportingAsset, availSchedule, reportee,
250: newEstimatedResult, Role.ASSIGNED);
251: getBlackboardService().publishAdd(assetTransfer);
252: }
253:
254: private void allocateRemote(Task task) {
255: Asset reportingAsset = task.getDirectObject();
256:
257: AllocationResult newEstimatedResult = PluginHelper
258: .createEstimatedAllocationResult(task,
259: myPlanningFactory, 1.0, true);
260:
261: Allocation allocation = myPlanningFactory.createAllocation(task
262: .getPlan(), task, reportingAsset, newEstimatedResult,
263: Role.ASSIGNED);
264: getBlackboardService().publishAdd(allocation);
265: return;
266: }
267:
268: protected Asset findLocalAsset(Asset asset) {
269: Object key = asset.getKey();
270: Asset localAsset = null;
271:
272: // Query subscription to see if clientAsset already exists
273: Collection collection = getBlackboardService().query(
274: new AssetPredicate(key));
275:
276: if (collection.size() > 0) {
277: Iterator iterator = collection.iterator();
278: localAsset = (Asset) iterator.next();
279:
280: if (iterator.hasNext()) {
281: throw new RuntimeException(
282: "AssetReportPlugin - multiple assets with UIC = "
283: + key);
284: }
285: }
286:
287: return localAsset;
288: }
289:
290: // ###############################################################
291: // END Allocation
292: // ###############################################################
293:
294: protected Object findIndirectObject(Task _task, String _prep) {
295: PrepositionalPhrase pp = _task.getPrepositionalPhrase(_prep);
296: if (pp == null)
297: throw new RuntimeException("Didn't find a single \""
298: + _prep + "\" Prepositional Phrase in " + _task);
299:
300: return pp.getIndirectObject();
301: }
302:
303: private void resendAssetTransfers() {
304: // BOZO - No support for removal of a local Asset
305: Collection changes = myLocalAssets.getChangedCollection();
306: if ((changes == null) || (changes.isEmpty())) {
307: return;
308: }
309:
310: for (Iterator iterator = changes.iterator(); iterator.hasNext();) {
311: Asset localAsset = (Asset) iterator.next();
312: // Determine whether or not asset transfers for the local asset should be
313: // resent. At this point, do not resend just because the relationship
314: // schedule changed. Warning - legtimate change will get lost if batched
315: // with relationship schedule changes unless a separate change report is
316: // generated.
317: // Also, do not resend just because a LocalPG changed. That is, the change must
318: // be anonymous or explicitly something other than RelationshipSchedule or LocalPG Change.
319: // OrgActivity changes that result in LocationSchedule changes (see the OplanWatcherLP)
320: // are such LocalPG changes that should not be propogated. The reasoning being that the
321: // AssetTransferLP / ReceiveAssetLP will just ignore any LocalPG changes, so why bother
322: // needlessly waking people up.
323: Collection changeReports = myLocalAssets
324: .getChangeReports(localAsset);
325: boolean resendRequired = false;
326:
327: if ((changeReports != AnonymousChangeReport.SET)
328: && (changeReports != null)) {
329: for (Iterator reportIterator = changeReports.iterator(); reportIterator
330: .hasNext();) {
331: ChangeReport report = (ChangeReport) reportIterator
332: .next();
333: if (!(report instanceof RelationshipSchedule.RelationshipScheduleChangeReport || report instanceof LocalPG.LocalPGChangeReport)) {
334: resendRequired = true;
335: break;
336: }
337: }
338: } else {
339: resendRequired = true;
340: }
341: if (resendRequired) {
342: resendAssetTransfers(localAsset, myAssetTransfers
343: .getCollection(), changeReports);
344: }
345: }
346: }
347:
348: /**
349: Resend a collection of AssetTransfers. Asset transfers are "sent"
350: by doing a publishChange. The change reports are supplied by the
351: caller, but are just the change reports of the change that
352: initiated this resend. Only transfers of our Asset to
353: other Assets are sent, the rest are ignored.
354: **/
355: private void resendAssetTransfers(Asset localAsset,
356: Collection transfers, Collection changeReports) {
357: for (Iterator i = transfers.iterator(); i.hasNext();) {
358: AssetTransfer at = (AssetTransfer) i.next();
359: if (at.getAsset().equals(localAsset)) {
360: if (at.getAssignee().equals(localAsset)) {
361: if (myLogger.isDebugEnabled()) {
362: myLogger
363: .debug(getAgentIdentifier()
364: + " resendAssetTransfers: not resending "
365: + at);
366: }
367: } else {
368: if (myLogger.isInfoEnabled())
369: myLogger
370: .info(getAgentIdentifier()
371: + " IS resending AssetTransfer of self to "
372: + at.getAssignee());
373: at.indicateAssetChange();
374: getBlackboardService().publishChange(at,
375: changeReports);
376: }
377: }
378: }
379: }
380:
381: // #######################################################################
382: // BEGIN predicates
383: // #######################################################################
384:
385: private static class AssetPredicate implements UnaryPredicate {
386: private final Object key;
387:
388: public AssetPredicate(Object key) {
389: this .key = key;
390: }
391:
392: public boolean execute(Object o) {
393: return ((o instanceof Asset) && (((Asset) o).getKey()
394: .equals(key)));
395: }
396: }
397:
398: // predicate for getting allocatable tasks of report for duty
399: private static final UnaryPredicate allReportTaskPred = new AllReportTaskPredicate();
400:
401: private static class AllReportTaskPredicate implements
402: UnaryPredicate {
403: public boolean execute(Object o) {
404: if (o instanceof Task) {
405: Task task = (Task) o;
406: if (task.getVerb().equals(Constants.Verb.REPORT)) {
407: return true;
408: }
409: }
410: return false;
411: }
412: }
413:
414: private static final UnaryPredicate allReportAssetTransferPred = new AllReportAssetTransferPredicate();
415:
416: private static class AllReportAssetTransferPredicate implements
417: UnaryPredicate {
418: public boolean execute(Object o) {
419: if (o instanceof AssetTransfer) {
420: Task t = ((AssetTransfer) o).getTask();
421: if (t.getVerb().equals(Constants.Verb.REPORT)) {
422: // if the PlanElement is for the correct kind of task then
423: // make sure it's an assettransfer
424: return true;
425: }
426: }
427: return false;
428: }
429: }
430:
431: private static class SelfOrgPredicate implements UnaryPredicate {
432: private final MessageAddress myAID;
433:
434: public SelfOrgPredicate(MessageAddress aid) {
435: super ();
436: myAID = aid;
437: }
438:
439: public boolean execute(Object o) {
440: if ((o instanceof Asset) && (o instanceof HasRelationships)
441: && (((Asset) o).hasClusterPG())) {
442: return ((Asset) o).getClusterPG().getMessageAddress()
443: .equals(myAID);
444: } else {
445: return false;
446: }
447: }
448: }
449: }
|