001: /*
002: * <copyright>
003: *
004: * Copyright 1999-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.glm.plugins.projection;
027:
028: import org.cougaar.core.blackboard.IncrementalSubscription;
029: import org.cougaar.glm.ldm.Constants;
030: import org.cougaar.glm.ldm.asset.Organization;
031: import org.cougaar.glm.ldm.plan.ObjectScheduleElement;
032: import org.cougaar.glm.plugins.AssetUtils;
033: import org.cougaar.glm.plugins.BasicProcessor;
034: import org.cougaar.glm.plugins.DecorationPlugin;
035: import org.cougaar.glm.plugins.TaskUtils;
036: import org.cougaar.glm.plugins.TimeUtils;
037: import org.cougaar.planning.ldm.asset.Asset;
038: import org.cougaar.planning.ldm.measure.CostRate;
039: import org.cougaar.planning.ldm.measure.CountRate;
040: import org.cougaar.planning.ldm.measure.FlowRate;
041: import org.cougaar.planning.ldm.measure.MassTransferRate;
042: import org.cougaar.planning.ldm.measure.Rate;
043: import org.cougaar.planning.ldm.measure.TimeRate;
044: import org.cougaar.planning.ldm.plan.PlanElement;
045: import org.cougaar.planning.ldm.plan.PrepositionalPhrase;
046: import org.cougaar.planning.ldm.plan.Schedule;
047: import org.cougaar.planning.ldm.plan.Task;
048: import org.cougaar.util.UnaryPredicate;
049: import org.cougaar.util.log.Logger;
050: import org.cougaar.util.log.Logging;
051:
052: import java.util.Enumeration;
053: import java.util.Hashtable;
054: import java.util.Iterator;
055: import java.util.Vector;
056:
057: /**
058: * Projects demand for assets consumed by some consumer.
059: */
060: public abstract class GenerateDemandExpander extends BasicProcessor {
061:
062: /** Plugin Parameter for creation of Supply instead of ProjectSupply tasks.
063: Values true/false, e.g. MyProjectionPlugin(+BulkWaterSupplyTasks) */
064: public final static String create_supply_tasks_param = "SupplyTasks";
065:
066: /** Subscription for GenerateDemand tasks. */
067: protected IncrementalSubscription generateDemandSubscription_;
068: /** Subscription to changes in publications by this processor */
069: protected UnaryPredicate ownTasksPredicate_;
070: protected IncrementalSubscription ownSubscription_;
071: protected UnaryPredicate ownTasksPEPredicate_;
072: protected IncrementalSubscription ownPESubscription_;
073: /** Subscription to policies */
074: protected IncrementalSubscription policySubscription_;
075: protected Hashtable policyTable_;
076: protected Vector resourceTypes_;
077: /** Table used for Diff-Based replanning of ProjectSupply tasks */
078: protected Hashtable publishedProjectionTable_;
079: private static Logger logger = Logging
080: .getLogger(GenerateDemandExpander.class);
081:
082: public GenerateDemandExpander(DecorationPlugin pi,
083: Organization org, Vector types, UnaryPredicate pred) {
084: super (pi, org);
085: resourceTypes_ = types;
086: ownTasksPredicate_ = pred;
087: ownTasksPEPredicate_ = new TasksPlanElementPredicate(
088: ownTasksPredicate_);
089: initializeSubscriptions();
090: publishedProjectionTable_ = new Hashtable();
091: }
092:
093: static class TasksPlanElementPredicate implements UnaryPredicate {
094: UnaryPredicate taskPredicate_;
095:
096: public TasksPlanElementPredicate(UnaryPredicate task_predicate) {
097: taskPredicate_ = task_predicate;
098: }
099:
100: public boolean execute(Object o) {
101: if (o instanceof PlanElement) {
102: return taskPredicate_.execute(((PlanElement) o)
103: .getTask());
104: }
105: return false;
106: }
107: }
108:
109: /** Create Demand Task
110: * @param quantity of resource requested
111: * @return 'demand' task for given resource
112: */
113: protected abstract Task newDemandTask(Task parent_task,
114: Asset resource, Object consumer, long end_time,
115: double quantity);
116:
117: /** Create Projection Task
118: * @param rate of consumption over time period
119: * @return 'demand' task for given resource
120: */
121: protected abstract Task newProjectionTask(Task parent_task,
122: Asset resource, Object consumer, long start_time,
123: long end_time, Rate rate, double multiplier);
124:
125: static class GenerateDemandPredicate implements UnaryPredicate {
126: Vector resourceTypes_;
127:
128: public GenerateDemandPredicate(Vector types) {
129: resourceTypes_ = types;
130: }
131:
132: public boolean execute(Object o) {
133: if (o instanceof Task) {
134: Task t = (Task) o;
135: if (t.getVerb().equals(
136: Constants.Verb.GENERATEPROJECTIONS)) {
137: PrepositionalPhrase pp = t
138: .getPrepositionalPhrase(Constants.Preposition.OFTYPE);
139: if (pp != null) {
140: Object obj = pp.getIndirectObject();
141: if (obj instanceof String) {
142: String type = (String) obj;
143: Enumeration en = resourceTypes_.elements();
144: while (en.hasMoreElements()) {
145: if (type.equals((String) en
146: .nextElement()))
147: return true;
148: }
149: }
150: }
151: }
152: }
153: return false;
154: }
155: }
156:
157: static class PolicyPredicate implements UnaryPredicate {
158: public boolean execute(Object o) {
159: if (o instanceof DemandProjectionPolicy) {
160: return true;
161: }
162: return false;
163: }
164: }
165:
166: /**
167: * Subscribe for generate demand tasks.
168: */
169: protected void initializeSubscriptions() {
170: generateDemandSubscription_ = subscribe(new GenerateDemandPredicate(
171: resourceTypes_));
172: ownSubscription_ = subscribe(ownTasksPredicate_);
173: ownPESubscription_ = subscribe(ownTasksPEPredicate_);
174: // DemandProjectionPolicies have two rules
175: // - max number of resources - now obsolete in Ants because done by IcisMEILDMPlugin
176: // - aggregation period - number of days between output demand tasks
177: // going away with true projection tasks
178: policySubscription_ = subscribe(new PolicyPredicate());
179: }
180:
181: /**
182: **/
183: public void update() {
184: updateGenerateDemandExpansion();
185: createPolicyTable(policySubscription_.elements());
186: publishedProjectionTable_.clear();
187:
188: if (isSubscriptionChanged(policySubscription_)) {
189: // reprocess everything
190: processTasks(generateDemandSubscription_.elements());
191: } else if (isSubscriptionChanged(generateDemandSubscription_)) {
192: // process new or changed items
193: processTasks(generateDemandSubscription_.getAddedList());
194: processTasks(generateDemandSubscription_.getChangedList());
195: }
196: }
197:
198: /** Fill in the policyTable_ with current DemandProjectionPolicies */
199: protected void createPolicyTable(Enumeration policies) {
200: DemandProjectionPolicy pol;
201: String type;
202:
203: policyTable_ = new Hashtable();
204: while (policies.hasMoreElements()) {
205: pol = (DemandProjectionPolicy) policies.nextElement();
206: type = pol.getItemConsumed();
207: policyTable_.put(type, pol);
208: }
209: }
210:
211: /**
212: * Expands the GenerateDemand tasks,
213: * creates and publishes the expanded task
214: * with the associated workflow.
215: * @param tasks tasks to be processed
216: * @return boolean - is there an error condition
217: */
218: public boolean processTasks(Enumeration tasks) {
219: Task task;
220: boolean is_failed = false;
221: while (tasks.hasMoreElements()) {
222: task = (Task) tasks.nextElement();
223: int num_demand_tasks = createResourceConsumptionTasks(task);
224: if (num_demand_tasks == 0) {
225: if (logger.isDebugEnabled()) {
226: logger
227: .debug("processTask: demands, but no demand tasks "
228: + TaskUtils.taskDesc(task));
229: }
230: is_failed = true;
231: continue;
232: }
233: // printDebug("processTask: expand "+TaskUtils.taskDesc(task)+
234: // " into "+num_demand_tasks+" tasks.");
235: }
236: return is_failed;
237: }
238:
239: /**
240: * Creates demand tasks for each consumed resource.
241: * @param parent_task
242: * @return Vector of demand tasks
243: * Converts the task's ConsumerSpec into resourceConsumptionRateSchedules
244: * which is then used to create a series of demand tasks. **/
245: protected int createResourceConsumptionTasks(Task parent_task) {
246: Asset resource;
247: Schedule rate_schedule;
248: Vector resource_demand = new Vector();
249: ConsumerSpec demand_spec = getDemandSpec(parent_task);
250: int num_parts = 0;
251: int total_tasks = 0;
252: Enumeration consumed_resources = demand_spec.getConsumed();
253:
254: while (consumed_resources.hasMoreElements()) {
255: resource = (Asset) consumed_resources.nextElement();
256: rate_schedule = demand_spec
257: .buildConsumptionRateSchedule(resource);
258: if (!rate_schedule.isEmpty()) {
259: Double mult = demand_spec.getMultiplier(resource);
260: Vector theseTasks = createDemandTasks(parent_task,
261: resource, rate_schedule, mult.doubleValue());
262: if (theseTasks.size() > 0) {
263: Task t = (Task) theseTasks.get(0);
264: if (t.getVerb()
265: .equals(Constants.Verb.PROJECTSUPPLY)) {
266: publishChangeProjection(parent_task, resource,
267: theseTasks.elements());
268: total_tasks += theseTasks.size();
269: } else {
270: resource_demand.addAll(theseTasks);
271: }
272: }
273: }
274: num_parts++;
275: }
276: int num_tasks = resource_demand.size();
277: if (num_tasks > 0) {
278: publishExpansion(parent_task, resource_demand);
279: total_tasks += num_tasks;
280: }
281: // printDebug(3, "createResourceConsumptionTasks for "+demand_spec.getConsumedType()+
282: // " for "+num_parts+" resources.");
283: return total_tasks;
284: }
285:
286: /**
287: * Create PROJECTSUPPLY tasks representing the demand.
288: * @param parent_task parent task (GenerateDemand)
289: * @return Vector of PROJECTSUPPLY tasks */
290: public Vector createDemandTasks(Task parent_task, Asset resource,
291: Schedule rate_schedule, double multiplier) {
292: if (rate_schedule.isEmpty())
293: return new Vector(0);
294: if (createSupplyTasksForType(parent_task)) {
295: if (logger.isDebugEnabled()) {
296: logger
297: .debug("createDemandTasks(), creating supply tasks for "
298: + TaskUtils
299: .getTaskItemName(parent_task));
300: }
301: return createPeriodicDemandTasks(parent_task, resource,
302: rate_schedule);
303: }
304: if (logger.isDebugEnabled()) {
305: logger
306: .debug("createDemandTasks(), creating projection tasks for "
307: + TaskUtils.getTaskItemName(parent_task));
308: }
309: return createConstantParameterDemandTasks(parent_task,
310: resource, rate_schedule, multiplier);
311: }
312:
313: /**
314: * Create SUPPLY tasks representing the demand.
315: * @param parent_task parent task (GenerateDemand)
316: * @return Vector of SUPPLY tasks */
317: protected Vector createPeriodicDemandTasks(Task parent_task,
318: Asset resource, Schedule rate_schedule) {
319: Vector supply_tasks = new Vector();
320:
321: Asset consumer = (Asset) parent_task.getDirectObject();
322: int aggregation_period = getAggregationPeriod(resource
323: .getTypeIdentificationPG().getTypeIdentification());
324: long aggregation_time = aggregation_period * MSEC_PER_DAY;
325:
326: int num_tasks = 0;
327: Task task;
328: double qty;
329: long sched_end = rate_schedule.getEndTime();
330: long task_end = rate_schedule.getStartTime();
331: while (task_end < sched_end) {
332: qty = getTotalDemand(rate_schedule, task_end, task_end
333: + aggregation_time);
334: if (qty > 0) {
335: task = newDemandTask(parent_task, resource, consumer,
336: task_end, qty);
337: if (task != null) {
338: supply_tasks.addElement(task);
339: num_tasks++;
340: }
341: }
342: task_end += aggregation_time;
343: }
344: if (logger.isDebugEnabled()) {
345: logger.debug("Created " + num_tasks + " "
346: + AssetUtils.assetDesc(resource)
347: + " demand tasks for consumer: "
348: + AssetUtils.assetDesc(consumer));
349: }
350: return supply_tasks;
351: }
352:
353: /**
354: * Create PROJECTSUPPLY tasks representing the demand.
355: * @param parent_task parent task (GenerateDemand)
356: * @return Vector of POJECTSUPPLY tasks */
357: protected Vector createConstantParameterDemandTasks(
358: Task parent_task, Asset resource, Schedule rate_schedule,
359: double multiplier) {
360: Vector projection_tasks = new Vector();
361:
362: Asset consumer = (Asset) parent_task.getDirectObject();
363: int num_tasks = 0;
364: // RateScheduleElement rse;
365: ObjectScheduleElement rse;
366: Task task;
367: Rate rate;
368: long task_start, task_end;
369: long now = TimeUtils.pushToEndOfDay(calendar_, getAlpTime());
370: Enumeration elements = rate_schedule.getAllScheduleElements();
371: while (elements.hasMoreElements()) {
372: // rse = (RateScheduleElement)elements.nextElement();
373: rse = (ObjectScheduleElement) elements.nextElement();
374: task_start = rse.getStartTime();
375: task_end = rse.getEndTime();
376: if (task_end < now) {
377: // Don't do anything with tasks in the past
378: continue;
379: }
380: // rate = rse.getRate();
381: rate = (Rate) rse.getObject();
382: // old
383: // rate = (rse.getQuantity() * ((task_end - qse.getStartTime())/MSEC_PER_DAY);
384: if (rate != null) {
385: task = newProjectionTask(parent_task, resource,
386: consumer, task_start, task_end, rate,
387: multiplier);
388: if (task != null) {
389: projection_tasks.addElement(task);
390: num_tasks++;
391: }
392: }
393: }
394: if (logger.isDebugEnabled()) {
395: logger.debug("Created " + num_tasks + " "
396: + AssetUtils.assetDesc(resource)
397: + " demand tasks for consumer: "
398: + AssetUtils.assetDesc(consumer));
399: }
400: return projection_tasks;
401: }
402:
403: /**
404: * @param sched Object schedule where Object is a rate
405: * @param start start time
406: * @param end end time
407: * @return amount used over given time period (rate*time)
408: **/
409: protected double getTotalDemand(Schedule sched, long start, long end) {
410: double total_demand = 0;
411: Iterator iter = sched
412: .getOverlappingScheduleElements(start, end).iterator();
413:
414: // RateScheduleElement rse;
415: ObjectScheduleElement rse;
416: Rate rate;
417: double duration;
418: long start_time, end_time;
419: double total_rate = 0;
420: while (iter.hasNext()) {
421: // rse = (RateScheduleElement)iter.next();
422: rse = (ObjectScheduleElement) iter.next();
423: start_time = rse.getStartTime();
424: if (start_time < start)
425: start_time = start;
426: end_time = rse.getEndTime();
427: if (end_time > end)
428: end_time = end;
429: // duration in days;
430: duration = (end_time - start_time) / (double) MSEC_PER_DAY;
431: // rate in days
432: // rate = rse.getRate();
433: rate = (Rate) rse.getObject();
434: if (rate instanceof CostRate) {
435: total_demand = ((CostRate) rate).getDollarsPerDay()
436: * duration;
437: } else if (rate instanceof CountRate) {
438: total_demand = ((CountRate) rate).getEachesPerDay()
439: * duration;
440: } else if (rate instanceof FlowRate) {
441: total_demand = ((FlowRate) rate).getGallonsPerDay()
442: * duration;
443: } else if (rate instanceof MassTransferRate) {
444: total_demand = ((MassTransferRate) rate)
445: .getShortTonsPerDay()
446: * duration;
447: } else if (rate instanceof TimeRate) {
448: total_demand = ((TimeRate) rate).getHoursPerDay()
449: * duration;
450: } else {
451: if (logger.isErrorEnabled()) {
452: logger
453: .error("getTotalDemand(), Unknown Rate type found : "
454: + rate);
455: }
456: }
457: // total_demand += rate.getPerDay()*duration;
458: }
459: return total_demand;
460:
461: }
462:
463: protected void updateGenerateDemandExpansion() {
464: // check allocation results
465: if (ownPESubscription_.elements().hasMoreElements()) {
466: if (logger.isDebugEnabled()) {
467: logger
468: .debug("updateGenerateDemandExpansion() has elements() added = "
469: + ownPESubscription_.getAddedList()
470: .hasMoreElements()
471: + " changed = "
472: + ownPESubscription_.getChangedList()
473: .hasMoreElements());
474: }
475: }
476: updateExpansionResult(ownPESubscription_.elements());
477: }
478:
479: /** Convenience method
480: * @param task GenerateDemand task that has demandSpec
481: * @return ConsumerSpec from PrepPhrase "DemandSpec"
482: **/
483: protected ConsumerSpec getDemandSpec(Task task) {
484: PrepositionalPhrase pp = task
485: .getPrepositionalPhrase("DemandSpec");
486: if (pp == null) {
487: if (logger.isErrorEnabled()) {
488: logger
489: .error("getDemandSpec badly formed task, no demand spec "
490: + task);
491: }
492: return null;
493: }
494: return (ConsumerSpec) pp.getIndirectObject();
495: }
496:
497: /**
498: * Convenience function
499: * @param type type id of the resource
500: * @return number of days between orders for resources.
501: * Defaults to one day.
502: **/
503: protected int getAggregationPeriod(String type) {
504: DemandProjectionPolicy policy = (DemandProjectionPolicy) policyTable_
505: .get(type);
506: if (policy == null) {
507: // default to once daily
508: return 1;
509: }
510: return policy.getDaysBetweenDemand();
511: }
512:
513: private boolean createSupplyTasksForType(Task parent_task) {
514: boolean result = false;
515: PrepositionalPhrase pp = parent_task
516: .getPrepositionalPhrase(Constants.Preposition.OFTYPE);
517: if (pp != null) {
518: Object obj = pp.getIndirectObject();
519: if (obj instanceof String) {
520: Boolean bool = (Boolean) plugin_.getParam((String) obj
521: + create_supply_tasks_param);
522: result = (bool != null && bool.booleanValue());
523: if (logger.isDebugEnabled()) {
524: logger
525: .debug("createSupplyTasksForType(), Create Supply tasks for "
526: + (String) obj + ", " + bool);
527: }
528: }
529: }
530: return result;
531: }
532:
533: protected void publishChangeProjection(Task parent_task,
534: Asset resource, Enumeration new_tasks) {
535:
536: // ownSubscription_ may contain both Supply and ProjectSupply tasks
537: if (publishedProjectionTable_.isEmpty()) {
538: cachePublishedProjections();
539: }
540: Enumeration tasks_to_publish = null;
541: Vector published_tasks = null;
542: published_tasks = (Vector) publishedProjectionTable_
543: .get(resource.getTypeIdentificationPG()
544: .getTypeIdentification());
545:
546: if (!new_tasks.hasMoreElements() && (published_tasks == null)) {
547: // No new tasks and no tasks from previous run. nothing to do.
548: return;
549: } else if (new_tasks.hasMoreElements()
550: && (published_tasks != null)) {
551: Schedule published_schedule = newObjectSchedule(published_tasks
552: .elements());
553: Schedule newtask_schedule = newObjectSchedule(new_tasks);
554: tasks_to_publish = diffProjections(published_schedule,
555: newtask_schedule);
556: } else if (new_tasks.hasMoreElements()) {
557: tasks_to_publish = new_tasks;
558: } else {
559: // Saw demand for asset in previous run but no demand in new run, remove old tasks
560: Enumeration e = published_tasks.elements();
561: while (e.hasMoreElements()) {
562: Task task = (Task) e.nextElement();
563: plugin_.publishRemoveFromExpansion(task);
564: }
565: }
566: if (tasks_to_publish != null) {
567: Task task;
568: while (tasks_to_publish.hasMoreElements()) {
569: task = (Task) tasks_to_publish.nextElement();
570: plugin_.publishAddToExpansion(parent_task, task);
571: // printDebug("publishChangeProjection(), Publishing new Projections: "+
572: // TaskUtils.projectionDesc(task));
573: }
574: }
575:
576: }
577:
578: private void cachePublishedProjections() {
579: Enumeration e = ownSubscription_.elements();
580: Task task;
581: String assetID;
582: Vector v;
583: // create table of projection tasks hashed on DO Asset
584: while (e.hasMoreElements()) {
585: task = (Task) e.nextElement();
586: if (task.getVerb().equals(Constants.Verb.PROJECTSUPPLY)) {
587: assetID = task.getDirectObject()
588: .getTypeIdentificationPG()
589: .getTypeIdentification();
590: v = (Vector) publishedProjectionTable_.get(assetID);
591: if (v == null) {
592: v = new Vector();
593: publishedProjectionTable_.put(assetID, v);
594: }
595: v.add(task);
596: }
597: }
598: }
599:
600: }
|