0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 1997-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.mlm.plugin.organization;
0028:
0029: import java.util.Collection;
0030: import java.util.Enumeration;
0031: import java.util.HashMap;
0032: import java.util.Iterator;
0033: import java.util.SortedSet;
0034: import java.util.TreeSet;
0035: import java.util.Vector;
0036:
0037: import org.cougaar.core.blackboard.IncrementalSubscription;
0038: import org.cougaar.core.component.ServiceRevokedEvent;
0039: import org.cougaar.core.component.ServiceRevokedListener;
0040: import org.cougaar.core.logging.LoggingServiceWithPrefix;
0041: import org.cougaar.core.plugin.ComponentPlugin;
0042: import org.cougaar.core.persist.PersistenceNotEnabledException;
0043: import org.cougaar.core.service.EventService;
0044: import org.cougaar.core.service.LoggingService;
0045: import org.cougaar.glm.ldm.Constants;
0046: import org.cougaar.glm.ldm.asset.Organization;
0047: import org.cougaar.planning.ldm.PlanningFactory;
0048: import org.cougaar.planning.ldm.asset.AbstractAsset;
0049: import org.cougaar.planning.ldm.asset.Asset;
0050: import org.cougaar.planning.ldm.plan.AllocationResult;
0051: import org.cougaar.planning.ldm.plan.AllocationResultAggregator;
0052: import org.cougaar.planning.ldm.plan.AspectType;
0053: import org.cougaar.planning.ldm.plan.AuxiliaryQueryType;
0054: import org.cougaar.planning.ldm.plan.Expansion;
0055: import org.cougaar.planning.ldm.plan.NewPrepositionalPhrase;
0056: import org.cougaar.planning.ldm.plan.NewTask;
0057: import org.cougaar.planning.ldm.plan.NewWorkflow;
0058: import org.cougaar.planning.ldm.plan.PlanElement;
0059: import org.cougaar.planning.ldm.plan.PrepositionalPhrase;
0060: import org.cougaar.planning.ldm.plan.Task;
0061: import org.cougaar.planning.ldm.plan.TaskScoreTable;
0062: import org.cougaar.planning.ldm.plan.Verb;
0063: import org.cougaar.planning.ldm.plan.Workflow;
0064: import org.cougaar.planning.plugin.util.PluginHelper;
0065: import org.cougaar.planning.service.LDMService;
0066: import org.cougaar.util.PropertyParser;
0067: import org.cougaar.util.UnaryPredicate;
0068:
0069: /**
0070: * The GLSExpanderPlugin will take the intial GetLogSupport task received by
0071: * an agent and expand it into getlogsupport for subordinates.
0072: *
0073: * @property org.cougaar.mlm.plugin.organization.GLSExpanderPlugin.persistEarly default false. When true, force a persist after this agent and all subordinates have received the GLS task. Used to avoid kill & restarts that rescind GLS task.
0074: **/
0075: public class GLSExpanderPlugin extends ComponentPlugin implements
0076: GLSConstants {
0077:
0078: protected EventService eventService;
0079:
0080: /**
0081: * RFE 3162: Set to true to force a persistence as soon as the agent
0082: * has finished propogating Stage-1 GLS. Send a CougaarEvent announcing completion.
0083: * This allows controllers to kill the agent as early as possible, without
0084: * potentially causing logistics plugin problems on rehydration.
0085: * Defaults to false - ie, do not force an early persist.
0086: **/
0087: private static final boolean persistEarly;
0088: static {
0089: persistEarly = PropertyParser
0090: .getBoolean(
0091: "org.cougaar.mlm.plugin.organization.GLSExpanderPlugin.persistEarly",
0092: false);
0093: }
0094:
0095: /** Subscription to hold collection of input tasks **/
0096: private IncrementalSubscription myGLSTasks;
0097: private IncrementalSubscription myRegisterServicesTasks;
0098: private IncrementalSubscription myFindProvidersTasks;
0099:
0100: /** Subscription to the Expansions I create */
0101: private IncrementalSubscription myGLSExpansions;
0102: private IncrementalSubscription myRegisterServicesExpansions;
0103: private IncrementalSubscription myFindProvidersExpansions;
0104:
0105: /**
0106: * Subscription to the DetermineRequirements tasks I create
0107: **/
0108: private IncrementalSubscription myDetermineRequirementsTasks;
0109:
0110: /**
0111: * The Socrates subscription
0112: **/
0113: private IncrementalSubscription mySelfOrgs;
0114:
0115: /** for knowing when we get our self org asset **/
0116: private Organization mySelfOrgAsset = null;
0117:
0118: /**
0119: * Parameters are the types of determinerequirements to generate.
0120: **/
0121: private String[] myParams = null;
0122: private PlanningFactory theLDMF = null;
0123:
0124: /**
0125: * Override the setupSubscriptions() in ComponentPlugin
0126: * Get an LDMService for factory calls
0127: * Use the blackboard service inherited from ComponentPlugin
0128: **/
0129: protected void setupSubscriptions() {
0130: String me = getAgentIdentifier().toString();
0131: logger = LoggingServiceWithPrefix.add(logger, me + ": ");
0132:
0133: // get event service - for use with persistEarly work
0134: eventService = (EventService) getBindingSite()
0135: .getServiceBroker().getService(this ,
0136: EventService.class, null);
0137:
0138: //System.out.println("setupSubscriptions: "+getAgentIdentifier());
0139: //get the LDM service to access the object factories from my bindingsite's servicebroker
0140: LDMService ldmService = null;
0141: if (theLDMF == null) {
0142: ldmService = (LDMService) getBindingSite()
0143: .getServiceBroker().getService(this ,
0144: LDMService.class,
0145: new ServiceRevokedListener() {
0146: public void serviceRevoked(
0147: ServiceRevokedEvent re) {
0148: theLDMF = null;
0149: }
0150: });
0151: }
0152: //use the service
0153: theLDMF = ldmService.getFactory();
0154:
0155: Collection params = getParameters();
0156: if (params != null) {
0157: myParams = (String[]) params.toArray(new String[params
0158: .size()]);
0159: } else {
0160: myParams = new String[0];
0161: }
0162:
0163: // subscribe using the blackboardservice - blackboard variable(representing the service)
0164: //is inherited from ComponentPlugin
0165: mySelfOrgs = (IncrementalSubscription) blackboard
0166: .subscribe(mySelfOrgAssetPred);
0167:
0168: if (blackboard.didRehydrate()) {
0169: processOrgAssets(mySelfOrgs.elements()); // May already be there
0170: }
0171: }
0172:
0173: private UnaryPredicate myDetermineRequirementsPredicate = new UnaryPredicate() {
0174: public boolean execute(Object o) {
0175: if (o instanceof Task) {
0176: Task task = (Task) o;
0177: Verb verb = task.getVerb();
0178: return (Constants.Verb.DetermineRequirements
0179: .equals(verb));
0180: }
0181: return false;
0182: }
0183: };
0184:
0185: private class TaskPredicate implements UnaryPredicate {
0186: private Verb myVerb;
0187:
0188: public TaskPredicate(Verb verb) {
0189: myVerb = verb;
0190: }
0191:
0192: public boolean execute(Object o) {
0193:
0194: if (o instanceof Task) {
0195: Task task = (Task) o;
0196: Verb verb = task.getVerb();
0197: if (verb.equals(myVerb)) {
0198: PrepositionalPhrase pp = task
0199: .getPrepositionalPhrase(FOR_ORGANIZATION);
0200: if (pp != null) {
0201: return pp.getIndirectObject().equals(
0202: mySelfOrgAsset);
0203: }
0204: }
0205: }
0206: return false;
0207: }
0208: }
0209:
0210: private static class ExpansionPredicate implements UnaryPredicate {
0211: private UnaryPredicate myTaskPredicate;
0212:
0213: public ExpansionPredicate(UnaryPredicate taskPredicate) {
0214: myTaskPredicate = taskPredicate;
0215: }
0216:
0217: public boolean execute(Object o) {
0218: if (o instanceof Expansion) {
0219: Expansion exp = (Expansion) o;
0220: return myTaskPredicate.execute(exp.getTask());
0221: }
0222: return false;
0223: }
0224: }
0225:
0226: private void setupSubscriptions2() {
0227: UnaryPredicate taskPredicate = new TaskPredicate(
0228: GET_LOG_SUPPORT);
0229: myGLSTasks = (IncrementalSubscription) blackboard
0230: .subscribe(taskPredicate);
0231: myGLSExpansions = (IncrementalSubscription) blackboard
0232: .subscribe(new ExpansionPredicate(taskPredicate));
0233:
0234: taskPredicate = new TaskPredicate(PROPAGATE_REGISTER_SERVICES);
0235: myRegisterServicesTasks = (IncrementalSubscription) blackboard
0236: .subscribe(taskPredicate);
0237: myRegisterServicesExpansions = (IncrementalSubscription) blackboard
0238: .subscribe(new ExpansionPredicate(taskPredicate));
0239:
0240: taskPredicate = new TaskPredicate(PROPAGATE_FIND_PROVIDERS);
0241: myFindProvidersTasks = (IncrementalSubscription) blackboard
0242: .subscribe(taskPredicate);
0243: myFindProvidersExpansions = (IncrementalSubscription) blackboard
0244: .subscribe(new ExpansionPredicate(taskPredicate));
0245:
0246: myDetermineRequirementsTasks = (IncrementalSubscription) blackboard
0247: .subscribe(myDetermineRequirementsPredicate);
0248:
0249: }
0250:
0251: /**
0252: * The predicate for the Socrates subscription
0253: **/
0254: private static UnaryPredicate mySelfOrgAssetPred = new UnaryPredicate() {
0255: public boolean execute(Object o) {
0256:
0257: if (o instanceof Organization) {
0258: Organization org = (Organization) o;
0259: return org.isSelf();
0260: }
0261: return false;
0262: }
0263: };
0264:
0265: /**
0266: * Plugin execute method is called every time one of our
0267: * subscriptions has something to do
0268: **/
0269: protected void execute() {
0270:
0271: if (mySelfOrgs.hasChanged()) {
0272: processOrgAssets(mySelfOrgs.getAddedList());
0273: }
0274:
0275: if (myRegisterServicesTasks == null) {
0276: return; // Still waiting for ourself
0277: }
0278:
0279: if (myRegisterServicesTasks.hasChanged()) {
0280: Collection adds = myRegisterServicesTasks
0281: .getAddedCollection();
0282: if (logger.isDebugEnabled()) {
0283: logger.debug("Expanding " + adds.size()
0284: + " PropagateRegisterServices tasks");
0285: }
0286:
0287: handleNewTasks(adds);
0288:
0289: Collection changes = myRegisterServicesTasks
0290: .getChangedCollection();
0291: handleChangedTasks(changes);
0292: }
0293:
0294: if (myFindProvidersTasks.hasChanged()) {
0295: Collection adds = myFindProvidersTasks.getAddedCollection();
0296: if (logger.isDebugEnabled()) {
0297: logger.debug("Expanding " + adds.size()
0298: + " PropagateFindProviders tasks");
0299: }
0300:
0301: handleNewTasks(adds);
0302:
0303: Collection changes = myFindProvidersTasks
0304: .getChangedCollection();
0305: handleChangedTasks(changes);
0306: }
0307:
0308: if (myGLSTasks.hasChanged()) {
0309: Collection adds = myGLSTasks.getAddedCollection();
0310: if (logger.isDebugEnabled()) {
0311: logger.debug("Expanding " + adds.size() + " GLS tasks");
0312: }
0313:
0314: handleNewTasks(adds);
0315:
0316: Collection changes = myGLSTasks.getChangedCollection();
0317: handleChangedTasks(changes);
0318: }
0319:
0320: if (myRegisterServicesExpansions.hasChanged()) {
0321: PluginHelper
0322: .updateAllocationResult(myRegisterServicesExpansions);
0323: }
0324:
0325: if (myFindProvidersExpansions.hasChanged()) {
0326: PluginHelper
0327: .updateAllocationResult(myFindProvidersExpansions);
0328: }
0329:
0330: if (myGLSExpansions.hasChanged()) {
0331:
0332: PluginHelper.updateAllocationResult(myGLSExpansions);
0333:
0334: /////////////////
0335: // Follows new logic to persist once GLS Stage-1 propogation
0336: // Is complete, so any kill of this agent
0337: // does not remove the GLS task on reconciliation
0338:
0339: // I want to persist when the GLS FOR self_org Stage1
0340: // has just gone confident
0341: if (persistEarly) {
0342: boolean s1Done = false;
0343: Enumeration changedPEs = myGLSExpansions
0344: .getChangedList();
0345: while (changedPEs.hasMoreElements()) {
0346: Expansion pe = (Expansion) changedPEs.nextElement();
0347: // Did this PE just change RepResult
0348: if (PluginHelper
0349: .checkChangeReports(
0350: myGLSExpansions
0351: .getChangeReports(pe),
0352: PlanElement.ReportedResultChangeReport.class)) {
0353: // Is the Est now 1?
0354: if (pe.getEstimatedResult() != null
0355: && pe.getEstimatedResult()
0356: .getConfidenceRating() == 1.0) {
0357: if (logger.isDebugEnabled())
0358: logger
0359: .debug("A GLS Expansion changed RepResult, now has conf 1: "
0360: + pe);
0361: Task task = pe.getTask();
0362: if (task == null) {
0363: if (logger.isInfoEnabled())
0364: logger
0365: .info("Null task for GLSExpansion: "
0366: + pe);
0367: continue;
0368: }
0369: // Now: is this task for self org and stage 1?
0370: SortedSet stages = (SortedSet) task
0371: .getPrepositionalPhrase(
0372: FOR_OPLAN_STAGES)
0373: .getIndirectObject();
0374: if (stages == null || stages.isEmpty()
0375: || stages.size() != 2) {
0376: // No good
0377: if (logger.isDebugEnabled())
0378: logger
0379: .debug("OplanStages say this is not stage 1: "
0380: + stages);
0381: } else {
0382: // FIXME: Do I need to confirm that the estresult
0383: // used to be _not_ 1?
0384: // FIXME: Is this the correct expansion?
0385: // Check that there is only the 1
0386: // GLS task on my subscription
0387: if (myGLSTasks.size() != 1) {
0388: if (logger.isDebugEnabled())
0389: logger
0390: .debug("I have more than the 1 GLS task?!?: "
0391: + myGLSTasks
0392: .size());
0393: } else {
0394: if (logger.isDebugEnabled())
0395: logger
0396: .debug("The only GLS for self has 2 stages and just changed its reported result and has a conf of 1: "
0397: + task);
0398: s1Done = true;
0399: break;
0400: }
0401: }
0402: }
0403: } // end checkChangeReports
0404: } // end loop over changed PEs
0405:
0406: if (s1Done) {
0407: if (logger.isDebugEnabled())
0408: logger
0409: .debug("Stage1 GLS task Done propogating. Will persist.");
0410: persistEarly();
0411: }
0412: } // if persistEarly
0413: } // if GLSExpansions changed
0414: } // execute()
0415:
0416: private void persistEarly() {
0417: try {
0418: // Bug 3282: Persistence doesn't put in the correct reasons for blocking in SchedulableStatus
0419: getBlackboardService().persistNow();
0420:
0421: // Now send a Cougaar event indicating the agent has sent GLS
0422: // and has persisted them (so no danger it will be rescinded)
0423: if (eventService != null && eventService.isEventEnabled()) {
0424: eventService.event(getAgentIdentifier()
0425: + " persisted after propogating GLS.");
0426: } else {
0427: logger
0428: .info(getAgentIdentifier()
0429: + " (no event service): persisted after propogating GLS.");
0430: }
0431: } catch (PersistenceNotEnabledException nope) {
0432: if (eventService != null && eventService.isEventEnabled()) {
0433: eventService
0434: .event(getAgentIdentifier()
0435: + " finished propogating GLS (persistence not enabled).");
0436: } else {
0437: logger
0438: .info(getAgentIdentifier()
0439: + " (no event service): finished propogating GLS (persistence not enabled).");
0440: }
0441: } // try/catch block
0442: }
0443:
0444: private void processOrgAssets(Enumeration e) {
0445: if (e.hasMoreElements()) {
0446: mySelfOrgAsset = (Organization) e.nextElement();
0447: // Setup our other subscriptions now that we know ourself
0448: if (myRegisterServicesTasks == null) {
0449: setupSubscriptions2();
0450: }
0451: }
0452: }
0453:
0454: private void handleNewTasks(Collection adds) {
0455:
0456: for (Iterator iterator = adds.iterator(); iterator.hasNext();) {
0457: Task task = (Task) iterator.next();
0458: if (task.getPlanElement() != null) {
0459: if (logger.isWarnEnabled())
0460: logger
0461: .warn("GLSExpanderPlugin.execute - strange, task "
0462: + task.getUID()
0463: + "\nhas already been expanded with p.e.:\n"
0464: + task.getPlanElement()
0465: + "\nSo skipping already expanded task.");
0466: } else {
0467: expand(task);
0468: }
0469: }
0470: }
0471:
0472: private void handleChangedTasks(Collection changes) {
0473: // Should never get changed GLS tasks so don't even try to handle them.
0474: // Last attempt to process changes led to months of debugging.
0475: for (Iterator iterator = changes.iterator(); iterator.hasNext();) {
0476: logger.error(getAgentIdentifier()
0477: + " ignoring a changed task "
0478: + (Task) iterator.next());
0479: }
0480: }
0481:
0482: /**
0483: * Expand a task into a GLS for subordinates plus
0484: * DETERMINEREQUIREMENTS for all types specified by params.
0485: * @param task The Task to expand.
0486: **/
0487: public void expand(Task task) {
0488: Verb taskVerb = task.getVerb();
0489:
0490: if (taskVerb.equals(GET_LOG_SUPPORT)) {
0491: expandGLS(task);
0492: } else if (taskVerb.equals(PROPAGATE_REGISTER_SERVICES)) {
0493: expandRegisterServices(task);
0494: } else if (taskVerb.equals(PROPAGATE_FIND_PROVIDERS)) {
0495: expandFindProviders(task);
0496: }
0497: }
0498:
0499: private void expandRegisterServices(Task task) {
0500: Vector subtasks = new Vector(2);
0501: AllocationResult estResult = PluginHelper
0502: .createEstimatedAllocationResult(task, theLDMF, 0.0,
0503: true);
0504: subtasks.addElement(createForSubordinatesTask(task));
0505: subtasks.addElement(createRegisterServices(task));
0506: Expansion exp = PluginHelper.wireExpansion(task, subtasks,
0507: theLDMF, estResult);
0508:
0509: // Use allocation result aggregator which ignores determine requirements
0510: // tasks.
0511: NewWorkflow workflow = (NewWorkflow) exp.getWorkflow();
0512: workflow.setAllocationResultAggregator(AGGREGATOR);
0513:
0514: //use the helper to publish the expansion and the wf subtasks all in one
0515: PluginHelper.publishAddExpansion(blackboard, exp);
0516: }
0517:
0518: private void expandFindProviders(Task task) {
0519: Vector subtasks = new Vector(2);
0520: AllocationResult estResult = PluginHelper
0521: .createEstimatedAllocationResult(task, theLDMF, 0.0,
0522: true);
0523: subtasks.addElement(createForSubordinatesTask(task));
0524: subtasks.addElement(createFindProviders(task));
0525: Expansion exp = PluginHelper.wireExpansion(task, subtasks,
0526: theLDMF, estResult);
0527:
0528: // Use allocation result aggregator which ignores determine requirements
0529: // tasks.
0530: NewWorkflow workflow = (NewWorkflow) exp.getWorkflow();
0531: workflow.setAllocationResultAggregator(AGGREGATOR);
0532:
0533: //use the helper to publish the expansion and the wf subtasks all in one
0534: PluginHelper.publishAddExpansion(blackboard, exp);
0535: }
0536:
0537: private void expandGLS(Task task) {
0538: Vector subtasks = new Vector();
0539: AllocationResult estResult = PluginHelper
0540: .createEstimatedAllocationResult(task, theLDMF, 0.0,
0541: true);
0542: subtasks.addElement(createForSubordinatesTask(task));
0543: Expansion exp = PluginHelper.wireExpansion(task, subtasks,
0544: theLDMF, estResult);
0545:
0546: // Use allocation result aggregator which ignores determine requirements
0547: // tasks.
0548: NewWorkflow workflow = (NewWorkflow) exp.getWorkflow();
0549: workflow.setAllocationResultAggregator(AGGREGATOR);
0550:
0551: //use the helper to publish the expansion and the wf subtasks all in one
0552: PluginHelper.publishAddExpansion(blackboard, exp);
0553:
0554: maybeAddDetermineRequirementsTasks(task, exp);
0555: }
0556:
0557: private void maybeAddDetermineRequirementsTasks(Task task,
0558: Expansion exp) {
0559: SortedSet stages = (SortedSet) task.getPrepositionalPhrase(
0560: FOR_OPLAN_STAGES).getIndirectObject();
0561: if (stages.isEmpty()) {
0562: if (logger.isDebugEnabled()) {
0563: logger.debug("stages.isEmpty() not adding DR tasks");
0564: }
0565: return;
0566: }
0567:
0568: // Check whether DR already exist
0569: if (myDetermineRequirementsTasks.size() > 0) {
0570: if (logger.isDebugEnabled()) {
0571: logger
0572: .debug("Already have DetermineRequirements tasks "
0573: + myDetermineRequirementsTasks);
0574: }
0575: return;
0576: }
0577:
0578: if (logger.isDebugEnabled()) {
0579: logger.debug("Adding DR tasks");
0580: }
0581: for (int i = 0; i < myParams.length; i++) {
0582: NewTask subtask = createDetermineRequirementsTask(task,
0583: myParams[i]);
0584: PluginHelper.wireExpansion(exp, subtask);
0585: blackboard.publishAdd(subtask);
0586: if (logger.isDebugEnabled()) {
0587: logger.debug("Add DR task " + myParams[i]);
0588: }
0589: }
0590: }
0591:
0592: /**
0593: * Create the for subordinates task resulting from the given
0594: * parent Task.
0595: * @param task Parent task to be used in creating an expanded gls task
0596: * @return NewTask the new expanded task.
0597: **/
0598: private NewTask createForSubordinatesTask(Task task) {
0599: // Create copy of parent Task
0600: NewTask subtask = createTask(task);
0601: subtask.setVerb(task.getVerb());
0602:
0603: Vector prepphrases = new Vector(3);
0604:
0605: // make the "subordinates" abstract asset and add a prep phrase with it
0606: Asset subasset_proto = theLDMF.createPrototype(Asset.class,
0607: "Subordinates");
0608: Asset subasset = theLDMF.createInstance(subasset_proto);
0609: NewPrepositionalPhrase newpp = theLDMF.newPrepositionalPhrase();
0610: newpp.setPreposition(FOR_ORGANIZATION);
0611: newpp.setIndirectObject(theLDMF.cloneInstance(subasset));
0612: prepphrases.addElement(newpp);
0613:
0614: Enumeration origpp = task.getPrepositionalPhrases();
0615: while (origpp.hasMoreElements()) {
0616: PrepositionalPhrase app = (PrepositionalPhrase) origpp
0617: .nextElement();
0618: if (app.getPreposition().equals(WITH_C0)) {
0619: prepphrases.addElement(app);
0620: } else if (app.getPreposition().equals(FOR_OPLAN_STAGES)) {
0621: app = copyOplanStagesPhrase(app);
0622: prepphrases.addElement(app);
0623: }
0624: }
0625: subtask.setPrepositionalPhrases(prepphrases.elements());
0626: return subtask;
0627: }
0628:
0629: /**
0630: * Create the FindProviders task resulting from the given
0631: * parent Task.
0632: * @param task Parent task to be used in creating an expanded gls task
0633: * @return NewTask the new expanded task.
0634: **/
0635: private NewTask createFindProviders(Task task) {
0636: // Create copy of parent Task
0637: NewTask subtask = createTask(task);
0638: subtask.setVerb(Constants.Verb.FindProviders);
0639:
0640: Vector prepphrases = new Vector();
0641: Enumeration origpp = task.getPrepositionalPhrases();
0642: PrepositionalPhrase oplanStagespp = null;
0643:
0644: while (origpp.hasMoreElements()) {
0645: PrepositionalPhrase app = (PrepositionalPhrase) origpp
0646: .nextElement();
0647: if (app.getPreposition().equals(FOR_OPLAN_STAGES)) {
0648: oplanStagespp = copyOplanStagesPhrase(app);
0649: }
0650: }
0651:
0652: if (oplanStagespp != null) {
0653: prepphrases.addElement(oplanStagespp);
0654: subtask.setPrepositionalPhrases(prepphrases.elements());
0655: }
0656:
0657: return subtask;
0658: }
0659:
0660: /**
0661: * Create the RegisterServices task resulting from the given
0662: * parent Task.
0663: * @param task Parent task to be used in creating an expanded gls task
0664: * @return NewTask the new expanded task.
0665: **/
0666: private NewTask createRegisterServices(Task task) {
0667: // Create copy of parent Task
0668: NewTask subtask = createTask(task);
0669: subtask.setVerb(Constants.Verb.RegisterServices);
0670:
0671: Vector prepphrases = new Vector();
0672: Enumeration origpp = task.getPrepositionalPhrases();
0673: PrepositionalPhrase oplanStagespp = null;
0674:
0675: while (origpp.hasMoreElements()) {
0676: PrepositionalPhrase app = (PrepositionalPhrase) origpp
0677: .nextElement();
0678: if (app.getPreposition().equals(FOR_OPLAN_STAGES)) {
0679: oplanStagespp = copyOplanStagesPhrase(app);
0680: }
0681: }
0682:
0683: if (oplanStagespp != null) {
0684: prepphrases.addElement(oplanStagespp);
0685: subtask.setPrepositionalPhrases(prepphrases.elements());
0686: }
0687:
0688: return subtask;
0689: }
0690:
0691: /**
0692: * Creates a DETERMINEREQUIREMENTS task of the specified type
0693: * @return Task The DetermineRequirements task
0694: **/
0695: public NewTask createDetermineRequirementsTask(Task task,
0696: String ofTypePreposition) {
0697: NewTask subtask = createTask(task);
0698: subtask.setVerb(Constants.Verb.DetermineRequirements);
0699:
0700: Vector prepphrases = new Vector();
0701:
0702: // get the existing prep phrase(s) - look for FOR <Agentname>
0703: // and add that one to the new subtask
0704: Enumeration origpp = task.getPrepositionalPhrases();
0705: while (origpp.hasMoreElements()) {
0706: PrepositionalPhrase app = (PrepositionalPhrase) origpp
0707: .nextElement();
0708: if ((app.getPreposition().equals(FOR_ORGANIZATION))
0709: && (app.getIndirectObject() instanceof Asset)) {
0710: prepphrases.addElement(app);
0711: }
0712: }
0713: Asset io_proto = theLDMF.createPrototype(AbstractAsset.class,
0714: ofTypePreposition);
0715: Asset indirectobj = theLDMF.createInstance(io_proto);
0716: NewPrepositionalPhrase pp = theLDMF.newPrepositionalPhrase();
0717: pp.setPreposition(Constants.Preposition.OFTYPE);
0718: pp.setIndirectObject(indirectobj);
0719: prepphrases.addElement(pp);
0720: subtask.setPrepositionalPhrases(prepphrases.elements());
0721: return subtask;
0722: }
0723:
0724: /**
0725: * Create a subtask of the parent
0726: * @param task the Parent
0727: * @return Newtask the newly created subtask
0728: **/
0729: private NewTask createTask(Task task) {
0730: NewTask subtask = theLDMF.newTask();
0731: subtask.setParentTask(task);
0732: subtask.setSource(this .getAgentIdentifier());
0733: if (task.getDirectObject() != null) {
0734: subtask.setDirectObject(theLDMF.cloneInstance(task
0735: .getDirectObject()));
0736: } else {
0737: subtask.setDirectObject(null);
0738: }
0739:
0740: subtask.setPlan(task.getPlan());
0741: synchronized (task) {
0742: subtask.setPreferences(task.getPreferences());
0743: }
0744: return subtask;
0745: }
0746:
0747: private PrepositionalPhrase copyOplanStagesPhrase(
0748: PrepositionalPhrase origPhrase) {
0749: if (FOR_OPLAN_STAGES.equals(origPhrase.getPreposition())) {
0750: NewPrepositionalPhrase newPhrase = theLDMF
0751: .newPrepositionalPhrase();
0752: newPhrase.setPreposition(FOR_OPLAN_STAGES);
0753: newPhrase.setIndirectObject(new TreeSet(
0754: (SortedSet) origPhrase.getIndirectObject()));
0755: return newPhrase;
0756: } else {
0757: logger
0758: .error(
0759: "copyOplanStagePhrase passed an invalid PrepositionalPhrase.",
0760: new Throwable());
0761: return null;
0762: }
0763: }
0764:
0765: /** rely upon load-time introspection to set these services - don't worry about revokation. */
0766: public final void setLoggingService(LoggingService logger) {
0767: this .logger = logger;
0768: }
0769:
0770: /**
0771: * Everybody needs a logger
0772: **/
0773: protected LoggingService logger;
0774:
0775: private static AllocationResultAggregator AGGREGATOR;
0776:
0777: static {
0778: AGGREGATOR = new GLSExpansionAggregator();
0779: }
0780:
0781: private static class GLSExpansionAggregator implements
0782: AllocationResultAggregator {
0783: private static final String UNDEFINED = "UNDEFINED";
0784:
0785: public AllocationResult calculate(Workflow wf,
0786: TaskScoreTable tst, AllocationResult currentar) {
0787: double acc[] = new double[AspectType._ASPECT_COUNT];
0788: acc[START_TIME] = Double.MAX_VALUE;
0789: acc[END_TIME] = 0.0;
0790: // duration is computed from end values of start and end
0791: acc[COST] = 0.0;
0792: acc[DANGER] = 0.0;
0793: acc[RISK] = 0.0;
0794: acc[QUANTITY] = 0.0;
0795: acc[INTERVAL] = 0.0;
0796: acc[TOTAL_QUANTITY] = 0.0;
0797: acc[TOTAL_SHIPMENTS] = 0.0;
0798: acc[CUSTOMER_SATISFACTION] = 1.0; // start at best
0799: acc[READINESS] = 1.0;
0800:
0801: boolean ap[] = new boolean[AspectType._ASPECT_COUNT];
0802:
0803: boolean suc = true;
0804: double rating = 0.0;
0805:
0806: if (tst == null)
0807: return null;
0808: int tstSize = tst.size();
0809: if (tstSize == 0)
0810: return null;
0811:
0812: int ignoredTasks = 0;
0813:
0814: String auxqsummary[] = new String[AuxiliaryQueryType.AQTYPE_COUNT];
0815: // initialize all values to UNDEFINED for comparison purposes below.
0816: int aql = auxqsummary.length;
0817: for (int aqs = 0; aqs < aql; aqs++) {
0818: auxqsummary[aqs] = UNDEFINED;
0819: }
0820:
0821: int hash = 0;
0822: for (int i = 0; i < tstSize; i++) {
0823: Task t = tst.getTask(i);
0824:
0825: // Ignore results on DetermineRequirement tasks!
0826: if (t.getVerb().equals(
0827: Constants.Verb.DetermineRequirements)) {
0828: ignoredTasks++;
0829: continue;
0830: }
0831:
0832: AllocationResult ar = tst.getAllocationResult(i);
0833: if (ar == null)
0834: return null; // bail if undefined
0835:
0836: suc = suc && ar.isSuccess();
0837: rating += ar.getConfidenceRating();
0838:
0839: int[] definedaspects = ar.getAspectTypes();
0840: int al = definedaspects.length;
0841: for (int b = 0; b < al; b++) {
0842: // accumulate the values for the defined aspects
0843: switch (definedaspects[b]) {
0844: case START_TIME:
0845: acc[START_TIME] = Math.min(acc[START_TIME], ar
0846: .getValue(START_TIME));
0847: ap[START_TIME] = true;
0848: hash |= (1 << START_TIME);
0849: break;
0850: case END_TIME:
0851: acc[END_TIME] = Math.max(acc[END_TIME], ar
0852: .getValue(END_TIME));
0853: ap[END_TIME] = true;
0854: hash |= (1 << END_TIME);
0855: break;
0856: // compute duration later
0857: case COST:
0858: acc[COST] += ar.getValue(COST);
0859: ap[COST] = true;
0860: hash |= (1 << COST);
0861: break;
0862: case DANGER:
0863: acc[DANGER] = Math.max(acc[DANGER], ar
0864: .getValue(DANGER));
0865: ap[DANGER] = true;
0866: hash |= (1 << DANGER);
0867: break;
0868: case RISK:
0869: acc[RISK] = Math.max(acc[RISK], ar
0870: .getValue(RISK));
0871: ap[RISK] = true;
0872: hash |= (1 << RISK);
0873: break;
0874: case QUANTITY:
0875: acc[QUANTITY] += ar.getValue(QUANTITY);
0876: ap[QUANTITY] = true;
0877: hash |= (1 << QUANTITY);
0878: break;
0879: // for now simply add the repetitve task values
0880: case INTERVAL:
0881: acc[INTERVAL] += ar.getValue(INTERVAL);
0882: ap[INTERVAL] = true;
0883: hash |= (1 << INTERVAL);
0884: break;
0885: case TOTAL_QUANTITY:
0886: acc[TOTAL_QUANTITY] += ar
0887: .getValue(TOTAL_QUANTITY);
0888: ap[TOTAL_QUANTITY] = true;
0889: hash |= (1 << TOTAL_QUANTITY);
0890: break;
0891: case TOTAL_SHIPMENTS:
0892: acc[TOTAL_SHIPMENTS] += ar
0893: .getValue(TOTAL_SHIPMENTS);
0894: ap[TOTAL_SHIPMENTS] = true;
0895: hash |= (1 << TOTAL_SHIPMENTS);
0896: break;
0897: //end of repetitive task specific aspects
0898: case CUSTOMER_SATISFACTION:
0899: acc[CUSTOMER_SATISFACTION] += ar
0900: .getValue(CUSTOMER_SATISFACTION);
0901: ap[CUSTOMER_SATISFACTION] = true;
0902: hash |= (1 << CUSTOMER_SATISFACTION);
0903: break;
0904: case READINESS:
0905: acc[READINESS] = Math.min(acc[READINESS], ar
0906: .getValue(READINESS));
0907: ap[READINESS] = true;
0908: hash |= (1 << READINESS);
0909: break;
0910: }
0911: }
0912:
0913: // Sum up the auxiliaryquery data. If there are conflicting data
0914: // values, send back nothing for that type. If only one subtask
0915: // has information about a querytype, send it back in the
0916: // aggregated result.
0917: for (int aq = 0; aq < AuxiliaryQueryType.AQTYPE_COUNT; aq++) {
0918: String data = ar.auxiliaryQuery(aq);
0919: if (data != null) {
0920: String sumdata = auxqsummary[aq];
0921: // if sumdata = null, there has already been a conflict.
0922: if (sumdata != null) {
0923: if (sumdata.equals(UNDEFINED)) {
0924: // there's not a value yet, so use this one.
0925: auxqsummary[aq] = data;
0926: } else if (!data.equals(sumdata)) {
0927: // there's a conflict, pass back null
0928: auxqsummary[aq] = null;
0929: }
0930: }
0931: }
0932: }
0933:
0934: } // end of looping through all subtasks
0935:
0936: // compute duration IFF defined.
0937: if (ap[START_TIME] && ap[END_TIME]) {
0938: acc[DURATION] = acc[END_TIME] - acc[START_TIME];
0939: ap[DURATION] = true;
0940: hash |= (1 << DURATION);
0941: } else {
0942: // redundant
0943: acc[DURATION] = 0.0;
0944: ap[DURATION] = false;
0945: }
0946:
0947: if (tstSize > 0) {
0948: acc[CUSTOMER_SATISFACTION] /= (tstSize - ignoredTasks);
0949: rating /= (tstSize - ignoredTasks);
0950:
0951: // Do not propagate fractional confidence, to avoid
0952: // rounding errors later (no, this shouldnt be happening, but....)
0953: if (rating < 1.0d)
0954: rating = 0.0;
0955: }
0956:
0957: boolean delta = false;
0958:
0959: // only check the defined aspects and make sure that the currentar is not null
0960: if (currentar == null) {
0961: delta = true; // if the current ar == null then set delta true
0962: } else {
0963: int[] caraspects = currentar.getAspectTypes();
0964: if (caraspects.length != acc.length) {
0965: //if the current ar length is different than the length of the new
0966: // calculations (acc) there's been a change
0967: delta = true;
0968: } else {
0969: int il = caraspects.length;
0970: for (int i = 0; i < il; i++) {
0971: int da = caraspects[i];
0972: if (ap[da] && acc[da] != currentar.getValue(da)) {
0973: delta = true;
0974: break;
0975: }
0976: }
0977: }
0978:
0979: if (!delta) {
0980: if (currentar.isSuccess() != suc) {
0981: delta = true;
0982: } else if (Math.abs(currentar.getConfidenceRating()
0983: - rating) > SIGNIFICANT_CONFIDENCE_RATING_DELTA) {
0984: delta = true;
0985: }
0986: }
0987: }
0988:
0989: if (delta) {
0990: int keys[] = _STANDARD_ASPECTS;
0991: int al = AspectType._ASPECT_COUNT;
0992:
0993: // see if we should compress the results array
0994: int lc = 0;
0995: for (int b = 0; b < al; b++) {
0996: if (ap[b])
0997: lc++;
0998: }
0999:
1000: if (lc < al) { // need to compress the arrays
1001: double nv[] = new double[lc];
1002:
1003: // lazy cache the key patterns
1004: synchronized (hack) {
1005: Integer ihash = new Integer(hash);
1006: KeyHolder kh = (KeyHolder) hack.get(ihash);
1007: if (kh == null) {
1008: //System.err.println("Caching key "+hash);
1009: int nk[] = new int[lc];
1010: int i = 0;
1011: for (int b = 0; b < al; b++) {
1012: if (ap[b]) {
1013: nv[i] = acc[b];
1014: nk[i] = keys[b];
1015: i++;
1016: }
1017: }
1018: acc = nv;
1019: keys = nk;
1020: kh = new KeyHolder(nk);
1021: hack.put(ihash, kh);
1022: } else {
1023: keys = kh.keys;
1024: int i = 0;
1025: for (int b = 0; b < al; b++) {
1026: if (ap[b]) {
1027: nv[i] = acc[b];
1028: i++;
1029: }
1030: }
1031: acc = nv;
1032: }
1033: }
1034:
1035: }
1036:
1037: AllocationResult artoreturn = new AllocationResult(
1038: rating, suc, keys, acc);
1039:
1040: for (int aqt = 0; aqt < aql; aqt++) {
1041: String aqdata = auxqsummary[aqt];
1042: if ((aqdata != null) && (aqdata != UNDEFINED)) {
1043: artoreturn.addAuxiliaryQueryInfo(aqt, aqdata);
1044: }
1045: }
1046: return artoreturn;
1047: } else {
1048: return currentar;
1049: }
1050: }
1051: }
1052:
1053: HashMap hack = new HashMap();
1054:
1055: final class KeyHolder {
1056: public int[] keys;
1057:
1058: public KeyHolder(int keys[]) {
1059: this.keys = keys;
1060: }
1061: }
1062: }
|