0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 1997-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.logistics.plugin.demand;
0028:
0029: import org.cougaar.core.blackboard.IncrementalSubscription;
0030: import org.cougaar.core.component.ServiceRevokedEvent;
0031: import org.cougaar.core.component.ServiceRevokedListener;
0032: import org.cougaar.core.mts.MessageAddress;
0033: import org.cougaar.core.plugin.ComponentPlugin;
0034: import org.cougaar.core.service.DomainService;
0035: import org.cougaar.core.service.AgentIdentificationService;
0036: import org.cougaar.core.service.LoggingService;
0037: import org.cougaar.core.service.QuiescenceReportService;
0038: import org.cougaar.core.service.AgentIdentificationService;
0039: import org.cougaar.logistics.ldm.Constants;
0040: import org.cougaar.glm.ldm.asset.Organization;
0041: import org.cougaar.glm.ldm.oplan.Oplan;
0042: import org.cougaar.logistics.plugin.inventory.AssetUtils;
0043: import org.cougaar.logistics.plugin.inventory.LogisticsOPlan;
0044: import org.cougaar.logistics.plugin.inventory.TaskUtils;
0045: import org.cougaar.logistics.plugin.inventory.TimeUtils;
0046: import org.cougaar.logistics.plugin.inventory.UtilsProvider;
0047: import org.cougaar.logistics.plugin.inventory.LogisticsPlanModule;
0048: import org.cougaar.logistics.plugin.inventory.LogisticsPlan;
0049: import org.cougaar.logistics.plugin.utils.LogisticsOPlanPredicate;
0050: import org.cougaar.logistics.plugin.utils.OrgActivityPred;
0051: import org.cougaar.logistics.plugin.utils.ScheduleUtils;
0052: import org.cougaar.logistics.plugin.utils.TaskScheduler;
0053: import org.cougaar.logistics.plugin.utils.TaskSchedulingPolicy;
0054: import org.cougaar.logistics.plugin.utils.QuiescenceAccumulator;
0055: import org.cougaar.planning.ldm.PlanningFactory;
0056: import org.cougaar.planning.ldm.asset.AggregateAsset;
0057: import org.cougaar.planning.ldm.asset.Asset;
0058: import org.cougaar.planning.ldm.asset.PropertyGroup;
0059: import org.cougaar.planning.ldm.plan.*;
0060: import org.cougaar.planning.plugin.util.PluginHelper;
0061: import org.cougaar.util.Filters;
0062: import org.cougaar.util.TimeSpan;
0063: import org.cougaar.util.DynamicUnaryPredicate;
0064: import org.cougaar.util.UnaryPredicate;
0065: import org.cougaar.glm.ldm.oplan.OrgActivity;
0066:
0067: import java.lang.reflect.Constructor;
0068: import java.lang.reflect.InvocationTargetException;
0069: import java.lang.reflect.Method;
0070: import java.util.*;
0071:
0072: /**
0073: * The DemandForecastPlugin is the Glue of demand generation.
0074: * It handles all blackboard services for its modules,
0075: * facilitates inter-module communication and manages the
0076: * subscriptions.
0077: * All modules are called from the DemandForecastPlugin.
0078: */
0079:
0080: public class DemandForecastPlugin extends ComponentPlugin implements
0081: UtilsProvider {
0082:
0083: private DomainService domainService;
0084: private AgentIdentificationService ais;
0085: private LoggingService logger;
0086: private TaskUtils taskUtils;
0087: private TimeUtils timeUtils;
0088: private AssetUtils AssetUtils;
0089: private ScheduleUtils scheduleUtils;
0090: private HashMap pluginParams;
0091: private HashMap pgToPredsHash;
0092: private HashMap pgToGPTaskHash;
0093: private HashMap subToPGsHash;
0094: private HashMap predToSubHash;
0095: private boolean rehydrate = false;
0096:
0097: private boolean dfpQuiescenceState = true;
0098:
0099: private String supplyType;
0100: private Class supplyClassPG;
0101:
0102: private Organization myOrganization;
0103: private String myOrgName;
0104: private DetReqExpanderIfc determineRequirementsExpander;
0105: private GenProjExpanderIfc generateProjectionsExpander;
0106: // private SchedulerModule planningScheduler;
0107:
0108: private boolean turnOffTaskSched = false;
0109:
0110: private boolean processedDetReq = false;
0111:
0112: private LogisticsPlanModule logisticsPlan;
0113:
0114: public final String SUPPLY_TYPE = "SUPPLY_TYPE";
0115: public final String SUPPLY_PG_CLASS = "SUPPLY_PG_CLASS";
0116: public final String REQ_EXPANDER = "REQ_EXPANDER";
0117: public final String PROJ_EXPANDER = "PROJ_EXPANDER";
0118: public final String TASK_SCHEDULER_OFF = "TASK_SCHEDULER_OFF";
0119:
0120: LogisticsOPlan logOPlan = null;
0121:
0122: protected long executePeriod = -1;
0123:
0124: public void load() {
0125: super .load();
0126: logger = getLoggingService(this );
0127: timeUtils = new TimeUtils(this );
0128: AssetUtils = new AssetUtils(this );
0129: taskUtils = new TaskUtils(this );
0130: scheduleUtils = new ScheduleUtils(this );
0131:
0132: //detReqHandler = new DetReqAggHandler(this);
0133: // readParameters() initializes supplyType and inventoryFile
0134: pluginParams = readParameters();
0135: determineRequirementsExpander = getDetermineRequirementsExpanderModule();
0136: generateProjectionsExpander = getGenerateProjectionsExpanderModule();
0137:
0138: logisticsPlan = new LogisticsPlan(getAgentIdentifier());
0139:
0140: pgToPredsHash = new HashMap();
0141: pgToGPTaskHash = new HashMap();
0142: subToPGsHash = new HashMap();
0143: predToSubHash = new HashMap();
0144:
0145: //startTime = currentTimeMillis();
0146:
0147: domainService = (DomainService) getServiceBroker().getService(
0148: this , DomainService.class,
0149: new ServiceRevokedListener() {
0150: public void serviceRevoked(ServiceRevokedEvent re) {
0151: if (DomainService.class.equals(re.getService()))
0152: domainService = null;
0153: }
0154: });
0155:
0156: ais = (AgentIdentificationService) getServiceBroker()
0157: .getService(this , AgentIdentificationService.class,
0158: null);
0159:
0160: // System.out.println("\n LOADING DemandForecastPlugin of type: " + supplyType +
0161: // "in org: " + getAgentIdentifier().toString() +
0162: // " this plugin is: " + this);
0163: }
0164:
0165: public void unload() {
0166: super .unload();
0167: if (domainService != null) {
0168: getServiceBroker().releaseService(this ,
0169: DomainService.class, domainService);
0170: }
0171: }
0172:
0173: public TaskUtils getTaskUtils() {
0174: return taskUtils;
0175: }
0176:
0177: public TimeUtils getTimeUtils() {
0178: return timeUtils;
0179: }
0180:
0181: public AssetUtils getAssetUtils() {
0182: return AssetUtils;
0183: }
0184:
0185: public ScheduleUtils getScheduleUtils() {
0186: return scheduleUtils;
0187: }
0188:
0189: public String getSupplyType() {
0190: return supplyType;
0191: }
0192:
0193: public Organization getMyOrganization() {
0194: return myOrganization;
0195: }
0196:
0197: public long getCurrentTimeMillis() {
0198: return currentTimeMillis();
0199: }
0200:
0201: public boolean publishAdd(Object o) {
0202: getBlackboardService().publishAdd(o);
0203: return true;
0204: }
0205:
0206: public void publishAddExpansion(Expansion expansion) {
0207: PluginHelper.publishAddExpansion(getBlackboardService(),
0208: expansion);
0209: }
0210:
0211: public boolean publishChange(Object o) {
0212: getBlackboardService().publishChange(o);
0213: return true;
0214: }
0215:
0216: public boolean publishRemove(Object o) {
0217: getBlackboardService().publishRemove(o);
0218: return true;
0219: }
0220:
0221: public PlanningFactory getPlanningFactory() {
0222: PlanningFactory rootFactory = null;
0223: if (domainService != null) {
0224: rootFactory = (PlanningFactory) domainService
0225: .getFactory("planning");
0226: }
0227: return rootFactory;
0228: }
0229:
0230: public LoggingService getLoggingService(Object requestor) {
0231: return (LoggingService) getServiceBroker().getService(
0232: requestor, LoggingService.class, null);
0233: }
0234:
0235: protected void execute() {
0236: if ((supplyClassPG == null) || (oplanSubscription.isEmpty())
0237: || (orgActivities.isEmpty())) {
0238: processedDetReq = false;
0239: return;
0240: }
0241:
0242: genProjTaskScheduler.initForExecuteCycle();
0243:
0244: if (!detReqSubscription.isEmpty()) {
0245: Iterator detReqIt = detReqSubscription.iterator();
0246: Task detReqTask = (Task) detReqIt.next();
0247: processedDetReq = (!(detReqTask.getPlanElement() == null));
0248:
0249: //There should be both a determineRequirements task
0250: //and an oplan before kicking off the expander for the first time.
0251: //from then on out we should be catching additional assets added, or removed.
0252: //It is also possible that this agent has no assets and the expander has to dispose of the detReqTask.
0253:
0254: //if there is a new determine requirements task or new oplan do this
0255: if (((!orgActivities.getAddedCollection().isEmpty()) && (!processedDetReq))
0256: || (!detReqSubscription.getAddedCollection()
0257: .isEmpty())) {
0258: processDetReq(detReqSubscription,
0259: assetsWithPGSubscription);
0260: }
0261: //otherwise just issue a new
0262: else if (!assetsWithPGSubscription.getAddedCollection()
0263: .isEmpty()) {
0264: processDetReq(detReqSubscription,
0265: assetsWithPGSubscription.getAddedCollection());
0266: } else if (!assetsWithPGSubscription.getRemovedCollection()
0267: .isEmpty()) {
0268: removeFromDetReq(detReqSubscription,
0269: assetsWithPGSubscription.getRemovedCollection());
0270: }
0271: }
0272:
0273: if (myOrganization == null) {
0274: myOrganization = getMyOrganization(selfOrganizations
0275: .elements());
0276: if (myOrganization != null) {
0277: projectSupplySubscription = (IncrementalSubscription) blackboard
0278: .subscribe(new ProjectSupplyPredicate(
0279: supplyType, getOrgName(), taskUtils));
0280: }
0281: }
0282:
0283: if (myOrganization == null) {
0284: if (logger.isInfoEnabled()) {
0285: logger.info("\n DemandForecastPlugin " + supplyType
0286: + " not ready to process tasks yet."
0287: + " my org is: " + myOrganization);
0288: }
0289: if (logger.isErrorEnabled()) {
0290: logger
0291: .error("DemandForecastPlugin: myOrganization is null, not ready to process tasks for "
0292: + ais.getMessageAddress().toString());
0293: }
0294: return;
0295: }
0296:
0297: // get the Logistics OPlan (our homegrown version with specific dates).
0298: logOPlan = logisticsPlan.updateOrgActivities(oplanSubscription,
0299: allOrgActivities);
0300:
0301: //Only after we have all the constituent parts to start going - oplan, orgActitivities, logOplan do we
0302: //lay down
0303: if (logOPlan != null) {
0304: if (logOPlan.getArrivalTime() != Long.MIN_VALUE) {
0305: if ((supplyClassPG != null)
0306: && (genProjTaskScheduler == null)) {
0307: setupTaskScheduler();
0308: //genProjSubscription = (IncrementalSubscription) blackboard.subscribe(new GenProjPredicate(supplyType, taskUtils));
0309: }
0310: } else {
0311: logOPlan = null;
0312: if (logger.isDebugEnabled()) {
0313: logger
0314: .debug("DemandForecastPlugin for "
0315: + ais.getMessageAddress()
0316: .toString()
0317: + " returning b/c logOPlan arrivalTime not set.");
0318: }
0319:
0320: return;
0321: }
0322: } else { // wait for logOPlan
0323: return;
0324: }
0325:
0326: // if (genProjSubscription != null) {
0327:
0328: HashSet justExpandedPGs = new HashSet();
0329:
0330: if (!genProjTaskScheduler.isEmpty()) {
0331: Collection removed = genProjTaskScheduler
0332: .getRemovedCollection();
0333: Collection added = genProjTaskScheduler
0334: .getAddedCollection();
0335: TimeSpan timeSpan = null;
0336: if (turnOffTaskSched) {
0337:
0338: long currentTime = currentTimeMillis();
0339:
0340: long newStartTime = getStartOfPeriod(currentTime);
0341: // timeSpan = new ScheduleElementImpl(oplan.getCday(),oplan.getEndDay());
0342: timeSpan = new ScheduleElementImpl(newStartTime,
0343: getLogOPlanEndTime());
0344: } else {
0345: //TODO WARNING - careful the timespan if the TaskScheduler is not being truncated by currentTimeMillis
0346: //like the above code. So if this is the first time through generating projections is may make projections
0347: //in the past.
0348: timeSpan = genProjTaskScheduler.getCurrentTimeSpan();
0349: }
0350: if (!removed.isEmpty())
0351: processRemovedGenProjs(removed, timeSpan);
0352: if (!added.isEmpty())
0353: justExpandedPGs = processNewGenProjs(added, timeSpan);
0354:
0355: genProjTaskScheduler.finishedExecuteCycle();
0356: }
0357: if (processedDetReq) {
0358: checkAndProcessHashSubscriptions(justExpandedPGs);
0359: }
0360:
0361: //Update the Allocation results on new or changed GP PlanElements
0362: if (genProjPESubscription.hasChanged()) {
0363: if (!genProjPESubscription.getAddedCollection().isEmpty()) {
0364: generateProjectionsExpander
0365: .updateAllocationResults(genProjPESubscription
0366: .getAddedCollection());
0367: }
0368: if (!genProjPESubscription.getChangedCollection().isEmpty()) {
0369: generateProjectionsExpander
0370: .updateAllocationResults(genProjPESubscription
0371: .getChangedCollection());
0372: }
0373: }
0374:
0375: //Update the Allocation results on new or changed DR PlanElements
0376: if (detReqPESubscription.hasChanged()) {
0377: if (!detReqPESubscription.getAddedCollection().isEmpty()) {
0378: determineRequirementsExpander
0379: .updateAllocationResults(detReqPESubscription
0380: .getAddedCollection());
0381: }
0382: if (!detReqPESubscription.getChangedCollection().isEmpty()) {
0383: determineRequirementsExpander
0384: .updateAllocationResults(detReqPESubscription
0385: .getChangedCollection());
0386: }
0387: }
0388:
0389: //deal with rehydration
0390: if (rehydrate) {
0391: rehydrateHashMaps();
0392: rehydrate = false;
0393: }
0394: }
0395:
0396: private IncrementalSubscription orgActivities;
0397: private IncrementalSubscription allOrgActivities;
0398: private IncrementalSubscription oplanSubscription;
0399: private IncrementalSubscription detReqSubscription;
0400: private IncrementalSubscription detReqPESubscription;
0401: private TaskScheduler genProjTaskScheduler;
0402: private IncrementalSubscription genProjPESubscription;
0403: private IncrementalSubscription projectSupplySubscription;
0404: //private IncrementalSubscription logisticsOPlanSubscription;
0405:
0406: /**
0407: * Subscription for the Organization(s) in which this plugin resides *
0408: */
0409: private IncrementalSubscription selfOrganizations;
0410:
0411: /**
0412: * Subscription for all assets with plugin parameters PG class attached to it *
0413: */
0414: private IncrementalSubscription assetsWithPGSubscription;
0415:
0416: protected void setupSubscriptions() {
0417: if (blackboard.didRehydrate()) {
0418: rehydrate = true;
0419: }
0420: selfOrganizations = (IncrementalSubscription) blackboard
0421: .subscribe(orgsPredicate);
0422:
0423: UnaryPredicate orgActivityPred = new OrgActivityPred();
0424: orgActivities = (IncrementalSubscription) blackboard
0425: .subscribe(orgActivityPred);
0426: allOrgActivities = (IncrementalSubscription) blackboard
0427: .subscribe(new OrgActivityPredicate());
0428: predToSubHash.put(orgActivityPred, orgActivities);
0429:
0430: oplanSubscription = (IncrementalSubscription) blackboard
0431: .subscribe(oplanPredicate);
0432:
0433: detReqSubscription = (IncrementalSubscription) blackboard
0434: .subscribe(new DetReqPredicate(supplyType, taskUtils));
0435: detReqPESubscription = (IncrementalSubscription) blackboard
0436: .subscribe(new DetReqPEPredicate(supplyType, taskUtils));
0437:
0438: genProjTaskScheduler = null;
0439: assetsWithPGSubscription = null;
0440:
0441: if (supplyClassPG != null) {
0442: //MWD took out
0443: setupTaskScheduler();
0444:
0445: //genProjSubscription = (IncrementalSubscription) blackboard.subscribe(new GenProjPredicate(supplyType, taskUtils));
0446:
0447: assetsWithPGSubscription = (IncrementalSubscription) getBlackboardService()
0448: .subscribe(new AssetOfTypePredicate(supplyClassPG));
0449: }
0450:
0451: genProjPESubscription = (IncrementalSubscription) blackboard
0452: .subscribe(new GenProjPEPredicate(supplyType, taskUtils));
0453:
0454: //logisticsOPlanSubscription = (IncrementalSubscription) blackboard.subscribe(new LogisticsOPlanPredicate());
0455: }
0456:
0457: private void setupTaskScheduler() {
0458: String taskScheduler = (String) pluginParams
0459: .get(TASK_SCHEDULER_OFF);
0460: if (taskScheduler != null) {
0461: turnOffTaskSched = new Boolean(taskScheduler)
0462: .booleanValue();
0463: } else {
0464: turnOffTaskSched = false;
0465: }
0466: QuiescenceReportService qrs = (QuiescenceReportService) getServiceBroker()
0467: .getService(this , QuiescenceReportService.class, null);
0468: // AgentIdentificationService ais = (AgentIdentificationService)
0469: // getServiceBroker().getService(this, AgentIdentificationService.class, null);
0470: qrs.setAgentIdentificationService(ais);
0471: QuiescenceAccumulator q = new QuiescenceAccumulator(qrs);
0472: String myId = getBlackboardClientName()
0473: + ais.getMessageAddress().toString() + supplyType
0474: + "DemandForecastPlugin";
0475: // if (myId.endsWith("SubsistenceDemandForecastPlugin")) {
0476: // logger.error ("DemandForecastPlugin created an id for TaskScheduler of :" +myId);
0477: // }
0478: if (!turnOffTaskSched) {
0479: if (logger.isDebugEnabled())
0480: logger.debug("DemandForecastor TASK SCHEDULER ON "
0481: + ais.getMessageAddress().toString()
0482: + getSupplyType());
0483: java.io.InputStream is = null;
0484: try {
0485: is = getConfigFinder().open("demandSchedPolicy.xml");
0486: } catch (Exception e) {
0487: if (logger.isErrorEnabled()) {
0488: logger
0489: .error("Could not find file demandSchedPolicy.xml");
0490: }
0491: }
0492: genProjTaskScheduler = new TaskScheduler(
0493: new GenProjPredicate(supplyType, taskUtils),
0494: TaskSchedulingPolicy.fromXML(is, this ,
0495: getAlarmService()), blackboard, q, logger,
0496: "GenProjs for " + getBlackboardClientName()
0497: + ais.getMessageAddress().toString()
0498: + supplyType + "DemandForecastPlugin");
0499: } else {
0500: if (logger.isDebugEnabled())
0501: logger.debug("DemandForecastor TASK SCHEDULER OFF "
0502: + ais.getMessageAddress().toString()
0503: + getSupplyType());
0504: genProjTaskScheduler = new TaskScheduler(
0505: new GenProjPredicate(supplyType, taskUtils),
0506: new TaskSchedulingPolicy(
0507: new TaskSchedulingPolicy.Predicate[] { TaskSchedulingPolicy.PASSALL }),
0508: blackboard, q, logger, "GenProjs for "
0509: + getBlackboardClientName()
0510: + ais.getMessageAddress().toString()
0511: + supplyType + "DemandForecastPlugin");
0512: }
0513: }
0514:
0515: private static UnaryPredicate orgsPredicate = new UnaryPredicate() {
0516: public boolean execute(Object o) {
0517: if (o instanceof Organization) {
0518: return ((Organization) o).isSelf();
0519: }
0520: return false;
0521: }
0522: };
0523:
0524: private static UnaryPredicate oplanPredicate = new UnaryPredicate() {
0525: public boolean execute(Object o) {
0526: return (o instanceof Oplan);
0527: }
0528: };
0529:
0530: /**
0531: * Predicate defining expandable Determine Reqs. *
0532: */
0533: private static class DetReqPredicate implements UnaryPredicate {
0534: private String supplyType;
0535: private TaskUtils taskUtils;
0536:
0537: public DetReqPredicate(String type, TaskUtils utils) {
0538: this .supplyType = type;
0539: this .taskUtils = utils;
0540: } // constructor
0541:
0542: public boolean execute(Object o) {
0543: if (o instanceof Task) {
0544: Task t = (Task) o;
0545: if (t.getVerb().equals(
0546: Constants.Verb.DETERMINEREQUIREMENTS)) {
0547: return taskUtils.isTaskOfType(t, supplyType);
0548: } // if
0549: } // if
0550: return false;
0551: } // execute
0552: } // DetReqPredicate
0553:
0554: /**
0555: * Predicate defining Determine Reqs PlanElements created in this plugin. *
0556: */
0557: private static class DetReqPEPredicate implements UnaryPredicate {
0558: private String supplyType;
0559: private TaskUtils taskUtils;
0560:
0561: public DetReqPEPredicate(String type, TaskUtils utils) {
0562: this .supplyType = type;
0563: this .taskUtils = utils;
0564: } // constructor
0565:
0566: public boolean execute(Object o) {
0567: if (o instanceof PlanElement) {
0568: Task t = ((PlanElement) o).getTask();
0569: if (t.getVerb().equals(
0570: Constants.Verb.DETERMINEREQUIREMENTS)) {
0571: return taskUtils.isTaskOfType(t, supplyType);
0572: } // if
0573: } // if
0574: return false;
0575: } // execute
0576: } // DetReqPEPredicate
0577:
0578: /**
0579: * Predicate defining expandable Determine Reqs. *
0580: */
0581: private static class GenProjPredicate implements
0582: TaskSchedulingPolicy.Predicate {
0583: private String supplyType;
0584: private TaskUtils taskUtils;
0585:
0586: public GenProjPredicate(String type, TaskUtils utils) {
0587: this .supplyType = type;
0588: this .taskUtils = utils;
0589: } // constructor
0590:
0591: public boolean execute(Task t) {
0592: return t.getVerb().equals(
0593: Constants.Verb.GENERATEPROJECTIONS)
0594: && taskUtils.isTaskOfTypeString(t, supplyType);
0595: } // execute
0596: } // GenProjPredicate
0597:
0598: /**
0599: * Predicate defining GenerateProjection PEs that this plugin created. *
0600: */
0601: private static class GenProjPEPredicate implements UnaryPredicate {
0602: private String supplyType;
0603: private TaskUtils taskUtils;
0604:
0605: public GenProjPEPredicate(String type, TaskUtils utils) {
0606: this .supplyType = type;
0607: this .taskUtils = utils;
0608: }
0609:
0610: public boolean execute(Object o) {
0611: if (o instanceof PlanElement) {
0612: Task t = ((PlanElement) o).getTask();
0613: if (t.getVerb().equals(
0614: Constants.Verb.GENERATEPROJECTIONS)) {
0615: return taskUtils.isTaskOfTypeString(t, supplyType);
0616: }
0617: }
0618: return false;
0619: }
0620: } // end GenProjPEPredicate
0621:
0622: /**
0623: * Predicate defining ProjectSupply tasks that this plugin created. *
0624: */
0625: private static class ProjectSupplyPredicate implements
0626: UnaryPredicate {
0627: private String supplyType;
0628: private String orgName;
0629: private TaskUtils taskUtils;
0630:
0631: public ProjectSupplyPredicate(String type, String myOrgName,
0632: TaskUtils utils) {
0633: this .supplyType = type;
0634: this .orgName = myOrgName;
0635: this .taskUtils = utils;
0636: }
0637:
0638: public boolean execute(Object o) {
0639: if (o instanceof Task) {
0640: Task t = (Task) o;
0641: if (t.getVerb().equals(Constants.Verb.PROJECTSUPPLY)) {
0642: if (taskUtils.isTaskOfTypeString(t, supplyType)) {
0643: return (taskUtils.isMyDemandForecastProjection(
0644: t, orgName));
0645: }
0646: }
0647: }
0648: return false;
0649: }
0650: } // end SupplyTaskPredicate
0651:
0652: private class AssetOfTypePredicate implements DynamicUnaryPredicate {
0653: private Class supplyPGClass;
0654:
0655: public AssetOfTypePredicate(Class pgClass) {
0656: this .supplyPGClass = pgClass;
0657: } // constructor
0658:
0659: /**
0660: * Predicate defining expandable Determine Reqs.
0661: */
0662: public boolean execute(Object o) {
0663: if (o instanceof Asset) {
0664: Asset a = (Asset) o;
0665: if (a instanceof AggregateAsset) {
0666: a = ((AggregateAsset) a).getAsset();
0667: }
0668: return (a.searchForPropertyGroup(supplyPGClass) != null);
0669: } // if
0670: return false;
0671: } // execute
0672: } // DetReqPredicate
0673:
0674: /**
0675: * Filters out tasks that already have PEs -- fix for bug #1695
0676: *
0677: * @param tasks - possibly from added list
0678: * @return Collection - tasks that have no PEs
0679: */
0680: protected Collection getTasksWithoutPEs(Collection tasks) {
0681:
0682: // I'm curious as to why we are using a hash set here? -- llg
0683: Set tasksWithoutPEs = new HashSet();
0684: for (Iterator iter = tasks.iterator(); iter.hasNext();) {
0685: Task task = (Task) iter.next();
0686:
0687: if (task.getPlanElement() != null) {
0688: if (logger.isDebugEnabled()) {
0689: logger
0690: .debug(getMyOrganization()
0691: + " - found task that already had a p.e. attached? : "
0692: + task.getUID()
0693: + " - so skipping it.");
0694: }
0695: } else {
0696: tasksWithoutPEs.add(task);
0697: }
0698: }
0699:
0700: return tasksWithoutPEs;
0701: }
0702:
0703: /**
0704: * Read the Plugin parameters(Accepts key/value pairs)
0705: * Initializes supplyType and inventoryFile
0706: */
0707: private HashMap readParameters() {
0708: final String errorString = "DemandForecastPlugin requires 2 parameters, Supply Type and associated SupplyPGClass. Additional parameter to change expander module. e.g. org.cougaar.logistics.plugin.inventory.DemandForecastPlugin("
0709: + SUPPLY_TYPE
0710: + "=BulkPOL, "
0711: + SUPPLY_PG_CLASS
0712: + "=FuelConsumerPG); Default package for SUPPLY_PG_CLASS is org.cougaar.logistics.ldm.asset. If PG is not in this package use fully qualified name.";
0713: Collection p = getParameters();
0714:
0715: if (p.isEmpty()) {
0716: if (logger.isErrorEnabled()) {
0717: logger.error(errorString);
0718: }
0719: return null;
0720: }
0721: HashMap map = new HashMap();
0722: int idx;
0723:
0724: for (Iterator i = p.iterator(); i.hasNext();) {
0725: String s = (String) i.next();
0726: if ((idx = s.indexOf('=')) != -1) {
0727: String key = new String(s.substring(0, idx));
0728: String value = new String(s.substring(idx + 1, s
0729: .length()));
0730: map.put(key.trim(), value.trim());
0731: }
0732: }
0733: supplyType = (String) map.get(SUPPLY_TYPE);
0734: String supplyClassPGStr = (String) map.get(SUPPLY_PG_CLASS);
0735: if (((supplyType == null) || (supplyClassPGStr == null) || (supplyClassPGStr
0736: .trim().equals(""))
0737: && logger.isErrorEnabled())) {
0738: logger.error(errorString);
0739: } else {
0740: if (supplyClassPGStr.indexOf(".") == -1) {
0741: supplyClassPGStr = "org.cougaar.logistics.ldm.asset."
0742: + supplyClassPGStr;
0743: }
0744: try {
0745: supplyClassPG = Class.forName(supplyClassPGStr);
0746: } catch (Exception e) {
0747: if (logger.isErrorEnabled()) {
0748: logger.error("Problem loading SUPPLY_PG_CLASS-"
0749: + supplyClassPGStr + "- exeception: " + e);
0750: }
0751: if (logger.isErrorEnabled()) {
0752: logger.error(errorString);
0753: }
0754: supplyClassPG = null;
0755: }
0756: }
0757: return map;
0758: }
0759:
0760: private void processDetReq(Collection addedDRs, Collection assets) {
0761: // with one oplan we should only have one DR task.
0762: Iterator drIt = addedDRs.iterator();
0763: if (drIt.hasNext()) {
0764: Task detReq = (Task) drIt.next();
0765: //synch on the detReq task so only one instance of this plugin
0766: // checks and creates a single agg task and then creates an
0767: // empty expansion (wf) for the maintain inventory for each item tasks
0768: synchronized (detReq) {
0769: determineRequirementsExpander
0770: .expandDetermineRequirements(detReq, assets);
0771: processedDetReq = true;
0772: }
0773: }
0774: }
0775:
0776: /**
0777: * This method processes the new GenerateProjection tasks. Basically it
0778: * just adds this tasks unique PG and GP and updates the hash maps
0779: * with the new information. We don't typically do the expansion here
0780: * because the newly added subscriptions (due to new PG) will be triggered
0781: * and caught by the checkAndProcessHashSubscriptions() method called later
0782: * in the same execute cycle, and expanded there.
0783: *
0784: * @param addedGPs - The collection of new GenerateProjections tasks
0785: */
0786:
0787: private HashSet processNewGenProjs(Collection addedGPs,
0788: TimeSpan timeSpan) {
0789: Iterator gpIt = addedGPs.iterator();
0790: HashSet justExpandedPGs = new HashSet();
0791: while (gpIt.hasNext()) {
0792: Task genProj = (Task) gpIt.next();
0793: Asset asset = genProj.getDirectObject();
0794: if (asset instanceof AggregateAsset) {
0795: asset = ((AggregateAsset) asset).getAsset();
0796: }
0797: PropertyGroup pg = asset
0798: .searchForPropertyGroup(supplyClassPG);
0799:
0800: pgToGPTaskHash.put(pg, genProj);
0801: justExpandedPGs.add(pg);
0802:
0803: /*
0804: * If there is a new pg (every new GP task has a new distinct MEI,
0805: * that has a distinct PG that has a distinct ConsumerPredicate),
0806: * add it to the hash tables, which will have the side effect
0807: * of subscribing the new ConsumerPredicate on the blackboard.
0808: * Later in the same execute cycle checkAndProcessHashSubscriptions()
0809: * will be called and fire for each new GP - because of the new ConsumerPredicate
0810: * firing.
0811: */
0812: if (!pgToPredsHash.containsKey(pg)) {
0813: addNewPG(pg);
0814: //We invoke GenProjections now because we do not expect any new subscriptions in the Hash table
0815: //to fire immediately. They should be just the orgActivities subscription at this point.
0816: invokeGenProjectionsExp(pg, genProj, timeSpan);
0817: }
0818: // For each new GP task it actually has a new unique PGImpl, so
0819: // this code should never be called. Especially if all the
0820: // hash tables are kept in line with whats on the blackboard.
0821: else {
0822: //with new TaskScheduler this is no longer a surprise, but is expected behavior.
0823: //logger.error("Surprise!!!! - unexpected expansion code firing in processNewGenProjs");
0824: invokeGenProjectionsExp(pg, genProj, timeSpan);
0825: }
0826: }
0827: return justExpandedPGs;
0828: }
0829:
0830: /**
0831: * This method keeps all the state hashTables in line with the GPTasks
0832: * processed. When GenerateProjection tasks are removed off the blackboard
0833: * This method is called to take all related PG hash and subscription hashes
0834: * up to date.
0835: *
0836: * @param removedGPs - The collection of GenerateProjection tasks just removed
0837: * from the blackboard.
0838: */
0839:
0840: private void processRemovedGenProjs(Collection removedGPs,
0841: TimeSpan timeSpan) {
0842: Iterator gpIt = removedGPs.iterator();
0843: while (gpIt.hasNext()) {
0844: Task genProj = (Task) gpIt.next();
0845: Asset asset = genProj.getDirectObject();
0846: if (asset instanceof AggregateAsset) {
0847: asset = ((AggregateAsset) asset).getAsset();
0848: }
0849: PropertyGroup pg = asset
0850: .searchForPropertyGroup(supplyClassPG);
0851:
0852: Collection preds = (Collection) pgToPredsHash.get(pg);
0853: pgToPredsHash.remove(pg);
0854: pgToGPTaskHash.remove(pg);
0855:
0856: if (preds != null) {
0857: Iterator predsIt = preds.iterator();
0858: while (predsIt.hasNext()) {
0859: UnaryPredicate pred = (UnaryPredicate) predsIt
0860: .next();
0861: IncrementalSubscription sub = (IncrementalSubscription) predToSubHash
0862: .get(pred);
0863: Collection subsPGs = (Collection) subToPGsHash
0864: .get(sub);
0865: subsPGs.remove(pg);
0866: if (subsPGs.isEmpty()) {
0867: blackboard.unsubscribe(sub);
0868: subToPGsHash.remove(sub);
0869: predToSubHash.remove(pred);
0870: }
0871: }
0872: }
0873:
0874: }
0875: }
0876:
0877: /**
0878: * This method goes through the subscriptions hash table and sees if any
0879: * of the subscriptions have changed. For each subscription thats changed
0880: * its PGs are collected in a set (so it doesn't exist more than once). The
0881: * resultant PG collection are set off to be processed (ie get the MEI and
0882: * GP task and re expand ).
0883: */
0884:
0885: protected void checkAndProcessHashSubscriptions(
0886: HashSet justExpandedPGs) {
0887: HashSet PGs = new HashSet();
0888: Iterator subIt = predToSubHash.entrySet().iterator();
0889: while (subIt.hasNext()) {
0890: Map.Entry entry = (Map.Entry) subIt.next();
0891: UnaryPredicate pred = (UnaryPredicate) entry.getKey();
0892: IncrementalSubscription sub = (IncrementalSubscription) entry
0893: .getValue();
0894:
0895: if ((!sub.getChangedCollection().isEmpty())
0896: || (!sub.getRemovedCollection().isEmpty())
0897: || (!sub.getAddedCollection().isEmpty())) {
0898:
0899: if (logger.isDebugEnabled()) {
0900: logger.debug("At " + getOrgName() + "-"
0901: + getSupplyType()
0902: + "-Subscription w/predicate: " + pred
0903: + " has changed: Added: "
0904: + sub.getAddedCollection().size()
0905: + " Removed: "
0906: + +sub.getRemovedCollection().size()
0907: + " Changed: "
0908: + +sub.getChangedCollection().size());
0909: }
0910: Collection subPGs = (Collection) subToPGsHash.get(sub);
0911: if (subPGs == null) {
0912: if ((sub != orgActivities)
0913: && (logger.isErrorEnabled())) {
0914: String errString = "Subscription fired in the hash table at "
0915: + getOrgName()
0916: + ", but there are no PGs in the other hash tables that correspond. The Predicate is "
0917: + pred.getClass().getName() + ".";
0918: logger.error(errString);
0919: }
0920: } else {
0921: PGs.addAll(subPGs);
0922: }
0923: }
0924: }
0925: if (PGs.isEmpty()) {
0926: if (logger.isDebugEnabled()) {
0927: logger
0928: .debug("No subscription change,no PGs to notfiy! subToPGsHash is: "
0929: + subToPGsHash);
0930: }
0931: } else {
0932: if (logger.isDebugEnabled()) {
0933: //logger.debug("!!!Subscriptions changed got PGs to notfiy! Collection of PGs are " + PGs);
0934: }
0935:
0936: /*
0937: * Whats going on here is we're filtering out an PGs that have just been expanded in the same
0938: * execute cycle. We were anticipating that this should never happen - that just expanded
0939: * GenerateProjection tasks pgs, their subscriptions should never fire. In which case this PGs
0940: * variable will be empty or the filteredPGs is empty. If it does ever happen, like a new
0941: * Consumer PG is introduced with new subscriptions that do fire immediately we will be covered. MWD.
0942: *
0943: */
0944:
0945: HashSet filteredPGs;
0946: if (justExpandedPGs.isEmpty()) {
0947: filteredPGs = PGs;
0948: } else {
0949: filteredPGs = new HashSet();
0950: Iterator pgIt = PGs.iterator();
0951: while (pgIt.hasNext()) {
0952: PropertyGroup pg = (PropertyGroup) pgIt.next();
0953: if (!(justExpandedPGs.contains(pg))) {
0954: filteredPGs.add(pg);
0955: }
0956: }
0957: }
0958:
0959: if (!filteredPGs.isEmpty()) {
0960: TimeSpan projectSpan = new ScheduleElementImpl(
0961: getLogOPlanStartTime(), getLogOPlanEndTime());
0962: processSubscriptionChangedPG(filteredPGs, projectSpan);
0963: //logger.error("About to call TS's clearState method for: " +getOrgName());
0964: genProjTaskScheduler.clearState(); //????
0965: }
0966: }
0967: }
0968:
0969: /**
0970: * This method goes through the HashTables and gets all the current PGs
0971: * registered in the Hash Table and has all of them reprocess. This occurs
0972: * the if we get the log Oplan for the first time, after possibly some of the
0973: * subscriptions in the hash tables have fired.
0974: * <p/>
0975: * TODO: Do we need this method anymore
0976: * <p/>
0977: * protected void processAllHashSubscriptions() {
0978: * Set PGs = pgToPredsHash.keySet();
0979: * if (PGs.isEmpty()) {
0980: * if (logger.isDebugEnabled()) {
0981: * logger.debug("No PGs in the hash tables: " + pgToPredsHash);
0982: * }
0983: * } else {
0984: * if (logger.isDebugEnabled()) {
0985: * //logger.debug("!!!Subscriptions changed got PGs to notfiy! Collection of PGs are " + PGs);
0986: * logger.debug("DemandForecastPlugin::ProcessAllHashSubscriptions at " + myOrganization +
0987: * "with Num PGs: " + PGs.size());
0988: * }
0989: * processSubscriptionChangedPG(PGs);
0990: * }
0991: * }
0992: */
0993:
0994: //Invoke the BG and the genProjExpander if there are changes
0995: //in the OrgActivities or Removals of OrgActivities.
0996: private void processSubscriptionChangedPG(Collection PGs,
0997: TimeSpan projectSpan) {
0998: Iterator pgIt = PGs.iterator();
0999: while (pgIt.hasNext()) {
1000: //ConsumerPG pg = (ConsumerPG) pgIt.next();
1001: //Asset asset = pg.getMei();
1002: PropertyGroup pg = (PropertyGroup) pgIt.next();
1003: Task gp = (Task) pgToGPTaskHash.get(pg);
1004: if (gp != null) {
1005: PlanElement pe = gp.getPlanElement();
1006: if ((pe == null) || (!(pe instanceof Disposition))) {
1007: logger
1008: .debug("******* invoking BG and GPE with changed Subscriptions **********");
1009: invokeGenProjectionsExp(pg, gp, projectSpan);
1010: }
1011: } else {
1012: if (logger.isErrorEnabled()) {
1013: logger
1014: .error("Property group :"
1015: + pg
1016: + " does not have an associated GenerateProjections task in the HashMap.");
1017: }
1018: }
1019: }
1020: }
1021:
1022: private void invokeGenProjectionsExp(PropertyGroup pg,
1023: Task genProj, TimeSpan projectSpan) {
1024: Collection pgInputs = getSubscriptions(pg);
1025: Schedule paramSchedule = getParameterSchedule(pg, pgInputs,
1026: projectSpan);
1027: //the expandGenerateProjections code will handle if the paramSchedule is null by putting
1028: // a disposition on the GP task since it will have a null plan element.
1029: // if (paramSchedule != null) {
1030: generateProjectionsExpander.expandGenerateProjections(genProj,
1031: paramSchedule, genProj.getDirectObject(), projectSpan);
1032: // }
1033: }
1034:
1035: private void removeFromDetReq(Collection addedDRs,
1036: Collection removedAssets) {
1037: // with one oplan we should only have one DR for MI.
1038: Iterator drIt = addedDRs.iterator();
1039: if (drIt.hasNext()) {
1040: Task detReq = (Task) drIt.next();
1041: //synch on the detReq task so only one instance of this plugin
1042: // checks and creates a single agg task and then creates an
1043: // empty expansion (wf) for the maintain inventory for each item tasks
1044: synchronized (detReq) {
1045: determineRequirementsExpander
1046: .removeSubtasksFromDetermineRequirements(
1047: detReq, removedAssets);
1048: processedDetReq = true;
1049: }
1050: }
1051: }
1052:
1053: protected Collection getSubscriptions(PropertyGroup pg) {
1054: if (!pgToPredsHash.containsKey(pg)) {
1055: addNewPG(pg);
1056: }
1057: ArrayList pgInputs = new ArrayList();
1058: Collection preds = (Collection) pgToPredsHash.get(pg);
1059:
1060: Iterator predsIt = preds.iterator();
1061: while (predsIt.hasNext()) {
1062: UnaryPredicate pred = (UnaryPredicate) predsIt.next();
1063: ArrayList inputPair = new ArrayList();
1064: IncrementalSubscription sub = (IncrementalSubscription) predToSubHash
1065: .get(pred);
1066: inputPair.add(pred);
1067: inputPair.add(sub.getCollection());
1068: pgInputs.add(inputPair);
1069: }
1070: return pgInputs;
1071: }
1072:
1073: protected void addNewPG(PropertyGroup pg) {
1074: Collection preds = getPredicates(pg);
1075: Iterator predIt = preds.iterator();
1076: while (predIt.hasNext()) {
1077: UnaryPredicate pred = (UnaryPredicate) predIt.next();
1078: IncrementalSubscription sub = (IncrementalSubscription) predToSubHash
1079: .get(pred);
1080: if (sub == null) {
1081: sub = (IncrementalSubscription) blackboard
1082: .subscribe(pred);
1083:
1084: //MWD Defuse the subscriptions Additions collection. A new PG with a new subscription
1085: //comes in only when a new GenerateProjections task comes in with a new MEI - this
1086: //is handled by the PG and GenerateProjectionExpander in the same section of code in
1087: //the execute() method when new GP tasks come in. We don't want to expand the
1088: //new task a second time when the checkAndProcessHashSubscriptions() kicks off later
1089: //in the execute run. So we disarm the added collection here when it is first added
1090: //to the black board.
1091: //sub.getAddedCollection().clear();
1092:
1093: predToSubHash.put(pred, sub);
1094: }
1095: Collection PGs = (Collection) subToPGsHash.get(sub);
1096: if (PGs == null) {
1097: PGs = new ArrayList();
1098: subToPGsHash.put(sub, PGs);
1099: }
1100: if (!PGs.contains(pg)) {
1101: PGs.add(pg);
1102: }
1103: }
1104: Collection hashPreds = (Collection) pgToPredsHash.get(pg);
1105: if (hashPreds == null) {
1106: pgToPredsHash.put(pg, preds);
1107: }
1108: }
1109:
1110: private void rehydrateHashMaps() {
1111: // Took this out because getAllTasks() is the union of all added, changed,
1112: // and removed tasks of which there are none after rehydration -
1113: // getAllTasksCollection is currently everything on the subscription.
1114: // Iterator gpIt = genProjTaskScheduler.getAllTasks();
1115: Collection gpTasks = genProjTaskScheduler
1116: .getAllTasksCollection();
1117: Iterator gpIt = gpTasks.iterator();
1118: while (gpIt.hasNext()) {
1119: Task gpTask = (Task) gpIt.next();
1120: Asset mei = gpTask.getDirectObject();
1121: if (mei instanceof AggregateAsset) {
1122: mei = ((AggregateAsset) mei).getAsset();
1123: }
1124: PropertyGroup pg = mei
1125: .searchForPropertyGroup(supplyClassPG);
1126: addNewPG(pg);
1127: pgToGPTaskHash.put(pg, gpTask);
1128: }
1129: }
1130:
1131: private String getClusterSuffix(String clusterId) {
1132: String result = null;
1133: int i = clusterId.lastIndexOf("-");
1134: if (i == -1) {
1135: result = clusterId;
1136: } else {
1137: result = clusterId.substring(i + 1);
1138: }
1139: return result;
1140: }
1141:
1142: /**
1143: * Creates an instance of an DetReqExpanderIfc by
1144: * searching plugin parameters for REQ_EXPANDER argument.
1145: * In the absence of an REQ_EXPANDER argument, a default is used:
1146: * org.cougaar.logistics.plugin.projection.DetermineRequirementsExpander
1147: *
1148: * @return {@link DetReqExpanderIfc}
1149: */
1150: private DetReqExpanderIfc getDetermineRequirementsExpanderModule() {
1151: String expanderClass = (String) pluginParams.get(REQ_EXPANDER);
1152: if (expanderClass != null) {
1153: try {
1154: Class[] paramTypes = { this .getClass() };
1155: Object[] initArgs = { this };
1156: Class cls = Class.forName(expanderClass);
1157: Constructor constructor = cls
1158: .getConstructor(paramTypes);
1159: DetReqExpanderIfc expander = (DetReqExpanderIfc) constructor
1160: .newInstance(initArgs);
1161: if (logger.isInfoEnabled()) {
1162: logger.info("Using RequirementsExpander "
1163: + expanderClass);
1164: }
1165: return expander;
1166: } catch (Exception e) {
1167: if (logger.isErrorEnabled()) {
1168: logger
1169: .error(e
1170: + " Unable to create RequirementsExpander instance of "
1171: + expanderClass
1172: + ". "
1173: + "Loading default org.cougaar.logistics.plugin.projection.DetermineRequirementsExpander");
1174: }
1175: }
1176: }
1177: return new DetermineRequirementsExpander(this );
1178: }
1179:
1180: /**
1181: * Creates an instance of an GenProjExpanderIfc by
1182: * searching plugin parameters for PROJ_EXPANDER argument.
1183: * In the absence of an PROJ_EXPANDER argument, a default is used:
1184: * org.cougaar.logistics.plugin.projection.DetermineRequirementsExpander
1185: *
1186: * @return {@link GenProjExpanderIfc}
1187: */
1188: private GenProjExpanderIfc getGenerateProjectionsExpanderModule() {
1189: String expanderClass = (String) pluginParams.get(PROJ_EXPANDER);
1190: if (expanderClass != null) {
1191: try {
1192: Class[] paramTypes = { this .getClass() };
1193: Object[] initArgs = { this };
1194: Class cls = Class.forName(expanderClass);
1195: Constructor constructor = cls
1196: .getConstructor(paramTypes);
1197: GenProjExpanderIfc expander = (GenProjExpanderIfc) constructor
1198: .newInstance(initArgs);
1199: if (logger.isInfoEnabled()) {
1200: logger.info("Using ProjectionsExpander "
1201: + expanderClass);
1202: }
1203: return expander;
1204: } catch (Exception e) {
1205: if (logger.isErrorEnabled()) {
1206: logger
1207: .error(e
1208: + " Unable to create ProjectionsExpander instance of "
1209: + expanderClass
1210: + ". "
1211: + "Loading default org.cougaar.logistics.plugin.projections.GenerateProjectionsExpander");
1212: }
1213: }
1214: }
1215: String gpExpanderClass = (String) pluginParams
1216: .get("GP_EXPANDER_CLASS");
1217: if (gpExpanderClass == null) {
1218: return new GenerateProjectionsExpander(this );
1219: } else {
1220: try {
1221: Class gpCls = Class.forName(gpExpanderClass);
1222: Class[] paramTypes = { this .getClass() };
1223: Object[] initArgs = { this };
1224: Constructor constructor = gpCls
1225: .getConstructor(paramTypes);
1226: return (GenerateProjectionsExpander) constructor
1227: .newInstance(initArgs);
1228: } catch (Exception e) {
1229: logger
1230: .error(e
1231: + " Unable to create Expander instance of "
1232: + gpExpanderClass
1233: + " Loading default org.cougaar.logistics.plugin.demand.GenerateProjectionsExpander");
1234: return new GenerateProjectionsExpander(this );
1235: }
1236: }
1237: }
1238:
1239: public void publishAddToExpansion(Task parent, Task subtask) {
1240: //attach the subtask to its parent and the parent's workflow
1241: PlanElement pe = parent.getPlanElement();
1242: Expansion expansion;
1243: NewWorkflow wf;
1244: ((NewTask) subtask).setParentTask(parent);
1245: ((NewTask) subtask).setPlan(parent.getPlan());
1246: // Task has not been expanded, create an expansion
1247: if (pe == null) {
1248: PlanningFactory factory = getPlanningFactory();
1249: // Create workflow
1250: wf = factory.newWorkflow();
1251: wf.setParentTask(parent);
1252: wf.setIsPropagatingToSubtasks(true);
1253: wf.addTask(subtask);
1254: ((NewTask) subtask).setWorkflow(wf);
1255: // Build Expansion
1256: expansion = factory.createExpansion(parent.getPlan(),
1257: parent, wf, null);
1258: // Publish Expansion
1259: publishAdd(expansion);
1260: }
1261: // Task already has expansion, add task to the workflow and publish the change
1262: else if (pe instanceof Expansion) {
1263: expansion = (Expansion) pe;
1264: wf = (NewWorkflow) expansion.getWorkflow();
1265: wf.addTask(subtask);
1266: ((NewTask) subtask).setWorkflow(wf);
1267: publishChange(expansion);
1268: } else {
1269: if (logger.isErrorEnabled()) {
1270: logger
1271: .error("publishAddToExpansion: problem pe not Expansion? "
1272: + pe);
1273: }
1274: }
1275:
1276: // Publish new task
1277: publishAdd(subtask);
1278: }
1279:
1280: private Organization getMyOrganization(Enumeration orgs) {
1281: Organization myOrg = null;
1282: // look for this organization
1283: if (orgs.hasMoreElements()) {
1284: myOrg = (Organization) orgs.nextElement();
1285: }
1286: return myOrg;
1287: }
1288:
1289: public MessageAddress getClusterId() {
1290: return getAgentIdentifier();
1291: }
1292:
1293: public String getOrgName() {
1294: if (myOrgName == null) {
1295: myOrgName = getMyOrganization().getItemIdentificationPG()
1296: .getItemIdentification();
1297: }
1298: return myOrgName;
1299: }
1300:
1301: public Class getSupplyClassPG() {
1302: return supplyClassPG;
1303: }
1304:
1305: // get the first day in theater
1306: public long getLogOPlanStartTime() {
1307: return logOPlan.getStartTime();
1308: }
1309:
1310: // get the last day in theater
1311: public long getLogOPlanEndTime() {
1312: return logOPlan.getEndTime();
1313: }
1314:
1315: public Collection getPredicates(PropertyGroup pg) {
1316: Collection preds = null;
1317: Class parameters[] = {};
1318: Object arguments[] = {};
1319: Method m = null;
1320: try {
1321: m = supplyClassPG.getMethod("getPredicates", parameters);
1322: } catch (NoSuchMethodException e) {
1323: e.printStackTrace();
1324: } catch (SecurityException e) {
1325: e.printStackTrace();
1326: }
1327: try {
1328: preds = (Collection) m.invoke(pg, arguments);
1329: return preds;
1330: } catch (IllegalAccessException e) {
1331: e.printStackTrace();
1332: } catch (IllegalArgumentException e) {
1333: e.printStackTrace();
1334: } catch (InvocationTargetException e) {
1335: e.printStackTrace();
1336: }
1337: return new ArrayList();
1338: }
1339:
1340: public Schedule getParameterSchedule(PropertyGroup pg,
1341: Collection pgInputs, TimeSpan projectSpan) {
1342: Schedule paramSchedule = null;
1343:
1344: if (projectSpan.getEndTime() <= projectSpan.getStartTime()) {
1345: if (logger.isErrorEnabled()) {
1346: logger
1347: .error("Was going to call getParameterSchedule, but the projectSpan spans a zero time span!");
1348: }
1349: } else {
1350: Class parameters[] = { Collection.class, TimeSpan.class };
1351: Object arguments[] = { pgInputs, projectSpan };
1352: Method m = null;
1353: try {
1354: if (!supplyClassPG.isInstance(pg)) {
1355: throw new IllegalArgumentException(
1356: "PG is not an instanceof of "
1357: + supplyClassPG + ": " + pg);
1358: }
1359: m = supplyClassPG.getMethod("getParameterSchedule",
1360: parameters);
1361: paramSchedule = (Schedule) m.invoke(pg, arguments);
1362: return paramSchedule;
1363: } catch (Exception e) {
1364: e.printStackTrace();
1365: }
1366: /*** TODO: MWD take out extra exceptions
1367: } catch (NoSuchMethodException e) {
1368: e.printStackTrace();
1369: } catch (SecurityException e) {
1370: e.printStackTrace();
1371: } catch (IllegalAccessException e) {
1372: e.printStackTrace();
1373: } catch (IllegalArgumentException e) {
1374: e.printStackTrace();
1375: } catch (InvocationTargetException e) {
1376: e.printStackTrace();
1377: }
1378: **/
1379: }
1380: return new ScheduleImpl();
1381: }
1382:
1383: //temp method for getting the MEI
1384: public Asset getMEI(PropertyGroup pg) {
1385: Asset mei = null;
1386: Class parameters[] = {};
1387: Object arguments[] = {};
1388: Method m = null;
1389: try {
1390: m = supplyClassPG.getMethod("getMei", parameters);
1391: } catch (NoSuchMethodException e) {
1392: e.printStackTrace();
1393: } catch (SecurityException e) {
1394: e.printStackTrace();
1395: }
1396: try {
1397: mei = (Asset) m.invoke(pg, arguments);
1398: } catch (IllegalAccessException e) {
1399: e.printStackTrace();
1400: } catch (IllegalArgumentException e) {
1401: e.printStackTrace();
1402: } catch (InvocationTargetException e) {
1403: e.printStackTrace();
1404: }
1405: return mei;
1406: }
1407:
1408: public Collection filter(UnaryPredicate predicate) {
1409: return Filters.filter(projectSupplySubscription, predicate);
1410: }
1411:
1412: /**
1413: * Returns a subset of project supply tasks for a given asset, for a given parent generate
1414: * projection task's UID.
1415: *
1416: * @param parentTask the generate projects tasks that was expanded
1417: * @return all project supply tasks of the parent generate projections task
1418: */
1419: public Collection projectSupplySet(final Task parentTask,
1420: final Asset consumedItem) {
1421: return filter(new UnaryPredicate() {
1422: public boolean execute(Object o) {
1423: Task t = (Task) o;
1424: if (t.getParentTaskUID().equals(parentTask.getUID())) {
1425: Asset a = t.getDirectObject();
1426: return a.equals(consumedItem);
1427: }
1428: return false;
1429: }
1430: });
1431: }
1432:
1433: public void updateStartAndEndTimes() {
1434: if (logOPlan != null) {
1435: if (!allOrgActivities.isEmpty()) {
1436: logOPlan.updateOrgActivities(allOrgActivities);
1437: }
1438: }
1439: }
1440:
1441: public class OrgActivityPredicate implements UnaryPredicate {
1442: private static final String predString = "OrgActivityPredicate";
1443:
1444: public boolean execute(Object o) {
1445: if (o instanceof OrgActivity) {
1446: return true;
1447: }
1448: return false;
1449: }
1450: }
1451:
1452: /**
1453: * Self-Test
1454: */
1455: public void automatedSelfTest() {
1456: if (logger.isErrorEnabled()) {
1457: if (supplyType == null)
1458: logger.error("No SupplyType Plugin parameter.");
1459: if (myOrganization == null)
1460: logger.error("Missing myorganization");
1461: }
1462: }
1463:
1464: protected long getPeriod() {
1465: if (executePeriod == -1) {
1466: executePeriod = 60 * 60 * 1000;
1467: }
1468: return executePeriod;
1469: }
1470:
1471: /**
1472: * Get the time in milliseconds that would be midnight of the day
1473: * before or first thing in the morning today.
1474: *
1475: * @return - the time in milliseconds that represents first thing in the
1476: * morning today
1477: */
1478: protected long getStartOfPeriod(long timeIn) {
1479: //long timeIn = getCurrentTimeMillis();
1480: //truncate to the whole number that represents the period num since the start of time.
1481:
1482: long period = getPeriod();
1483:
1484: long periods = (long) (timeIn / period);
1485: //Multiply it back to which gives the start of the period.
1486: long timeOut = periods * period;
1487: if (timeIn == timeOut) {
1488: if (logger.isDebugEnabled()) {
1489: logger
1490: .debug("GetStartOfToday - unexpected timeIn==timeOut=="
1491: + new Date(timeOut));
1492: }
1493: }
1494: return timeOut;
1495: }
1496: }
|