0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 2002-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.servicediscovery.plugin;
0028:
0029: import org.cougaar.core.blackboard.IncrementalSubscription;
0030: import org.cougaar.core.mts.MessageAddress;
0031: import org.cougaar.core.persist.PersistenceNotEnabledException;
0032: import org.cougaar.core.service.EventService;
0033: import org.cougaar.core.service.LoggingService;
0034: import org.cougaar.planning.ldm.asset.Asset;
0035: import org.cougaar.planning.ldm.asset.NewRelationshipPG;
0036: import org.cougaar.planning.ldm.plan.AllocationResult;
0037: import org.cougaar.planning.ldm.plan.Disposition;
0038: import org.cougaar.planning.ldm.plan.PlanElement;
0039: import org.cougaar.planning.ldm.plan.Preference;
0040: import org.cougaar.planning.ldm.plan.PrepositionalPhrase;
0041: import org.cougaar.planning.ldm.plan.Role;
0042: import org.cougaar.planning.ldm.plan.ScheduleElement;
0043: import org.cougaar.planning.ldm.plan.Task;
0044: import org.cougaar.planning.plugin.legacy.SimplePlugin;
0045: import org.cougaar.planning.plugin.util.PluginHelper;
0046: import org.cougaar.servicediscovery.Constants;
0047: import org.cougaar.servicediscovery.SDDomain;
0048: import org.cougaar.servicediscovery.SDFactory;
0049: import org.cougaar.servicediscovery.description.Lineage;
0050: import org.cougaar.servicediscovery.description.LineageEchelonScorer;
0051: import org.cougaar.servicediscovery.description.MMRoleQuery;
0052: import org.cougaar.servicediscovery.description.ProviderCapabilities;
0053: import org.cougaar.servicediscovery.description.ScoredServiceDescription;
0054: import org.cougaar.servicediscovery.description.ServiceClassification;
0055: import org.cougaar.servicediscovery.description.ServiceContract;
0056: import org.cougaar.servicediscovery.description.ServiceDescription;
0057: import org.cougaar.servicediscovery.description.ServiceInfoScorer;
0058: import org.cougaar.servicediscovery.description.ServiceRequest;
0059: import org.cougaar.servicediscovery.description.ServiceRequestImpl;
0060: import org.cougaar.servicediscovery.transaction.MMQueryRequest;
0061: import org.cougaar.servicediscovery.transaction.ServiceContractRelay;
0062: import org.cougaar.servicediscovery.util.LineageTimeSpan;
0063: import org.cougaar.servicediscovery.util.UDDIConstants;
0064: import org.cougaar.util.NonOverlappingTimeSpanSet;
0065: import org.cougaar.util.PropertyParser;
0066: import org.cougaar.util.TimeSpan;
0067: import org.cougaar.util.TimeSpanSet;
0068: import org.cougaar.util.TimeSpans;
0069: import org.cougaar.util.UnaryPredicate;
0070:
0071: import java.text.SimpleDateFormat;
0072: import java.util.ArrayList;
0073: import java.util.Collection;
0074: import java.util.Collections;
0075: import java.util.Date;
0076: import java.util.Enumeration;
0077: import java.util.HashMap;
0078: import java.util.Iterator;
0079: import java.util.List;
0080: import java.util.Map;
0081: import java.util.Set;
0082:
0083: /**
0084: * Look up in YP agents providing roles given as parameters,
0085: * by creating MMRequests for these. When it gets answers,
0086: * create relays with those agents asking for the service.
0087: * Keep asking until have contracts in the relays for all roles
0088: * and all time intervals.
0089: *
0090: */
0091: public class SDClientPlugin extends SimplePlugin {
0092: private static int WARNING_SUPPRESSION_INTERVAL = 4;
0093: private static final String CLIENT_GRACE_PERIOD = "org.cougaar.servicediscovery.plugin.ClientGracePeriod";
0094: private long myWarningCutoffTime = -1;
0095:
0096: private IncrementalSubscription mySelfOrgSubscription;
0097: private IncrementalSubscription myMMRequestSubscription;
0098: protected IncrementalSubscription myServiceContractRelaySubscription;
0099: private IncrementalSubscription myFindProvidersTaskSubscription;
0100: private IncrementalSubscription myLineageSubscription;
0101:
0102: /** for knowing when we get our self org asset **/
0103: private Asset mySelfOrg = null;
0104:
0105: protected LoggingService myLoggingService;
0106: protected EventService myEventService;
0107:
0108: protected SDFactory mySDFactory;
0109:
0110: private NonOverlappingTimeSpanSet myOPCONSchedule = null;
0111:
0112: private Map myRoles = new HashMap();
0113: /**
0114: * RFE 3162: Set to true to force a persistence as soon as the agent
0115: * has finished finding providers. Send a CougaarEvent announcing completion.
0116: * This allows controllers to kill the agent as early as possible, without
0117: * potentially causing logistics plugin problems on rehydration.
0118: * Defaults to false - ie, do not force an early persist.
0119: **/
0120: private static final boolean PERSIST_EARLY;
0121:
0122: static {
0123: PERSIST_EARLY = PropertyParser
0124: .getBoolean(
0125: "org.cougaar.servicediscovery.plugin.SDClientPlugin.persistEarly",
0126: false);
0127: }
0128:
0129: private boolean myNeedToFindProviders = true;
0130:
0131: private static final UnaryPredicate mySelfOrgPred = new SelfOrgPredicate();
0132:
0133: private static final class SelfOrgPredicate implements
0134: UnaryPredicate {
0135: public boolean execute(Object o) {
0136: if (o instanceof Asset) {
0137: Asset org = (Asset) o;
0138: if (org.hasRelationshipPG()
0139: && ((NewRelationshipPG) org.getRelationshipPG())
0140: .getRelationshipBG() != null
0141: && org.getRelationshipPG().isLocal()) {
0142: return true;
0143: }
0144: }
0145: return false;
0146: }
0147: }
0148:
0149: // Subscription to ServiceContractRelays for which I am the client!
0150: private final UnaryPredicate myServiceContractRelayPred = new ServiceContractRelayPredicate();
0151:
0152: private final class ServiceContractRelayPredicate implements
0153: UnaryPredicate {
0154: public boolean execute(Object o) {
0155: return ((o instanceof ServiceContractRelay) && ((ServiceContractRelay) o)
0156: .getClient().equals(getSelfOrg()));
0157: }
0158: }
0159:
0160: private static final UnaryPredicate myMMRequestPred = new MMRequestPredicate();
0161:
0162: private static final class MMRequestPredicate implements
0163: UnaryPredicate {
0164: public boolean execute(Object o) {
0165: return (o instanceof MMQueryRequest);
0166: }
0167: }
0168:
0169: private static final UnaryPredicate myFindProvidersTaskPred = new FindProvidersTaskPredicate();
0170:
0171: private static final class FindProvidersTaskPredicate implements
0172: UnaryPredicate {
0173: public boolean execute(Object o) {
0174: return ((o instanceof Task) && (((Task) o).getVerb()
0175: .equals(Constants.Verbs.FindProviders)));
0176: }
0177: }
0178:
0179: private static final UnaryPredicate myLineagePred = new OpconLineagePredicate();
0180:
0181: private static final class OpconLineagePredicate implements
0182: UnaryPredicate {
0183: public boolean execute(Object o) {
0184: return ((o instanceof Lineage) && (((Lineage) o).getType() == Lineage.OPCON));
0185: }
0186: }
0187:
0188: private static final UnaryPredicate myProviderCapabilitiesPred = new ProviderCapabilitiesPredicate();
0189:
0190: private static final class ProviderCapabilitiesPredicate implements
0191: UnaryPredicate {
0192: public boolean execute(Object o) {
0193: return (o instanceof ProviderCapabilities);
0194: }
0195: }
0196:
0197: protected void setupSubscriptions() {
0198: mySelfOrgSubscription = (IncrementalSubscription) subscribe(mySelfOrgPred);
0199: myMMRequestSubscription = (IncrementalSubscription) subscribe(myMMRequestPred);
0200: myServiceContractRelaySubscription = (IncrementalSubscription) subscribe(myServiceContractRelayPred);
0201: myFindProvidersTaskSubscription = (IncrementalSubscription) subscribe(myFindProvidersTaskPred);
0202: myLineageSubscription = (IncrementalSubscription) subscribe(myLineagePred);
0203:
0204: myLoggingService = (LoggingService) getBindingSite()
0205: .getServiceBroker().getService(this ,
0206: LoggingService.class, null);
0207:
0208: // get event service
0209: myEventService = (EventService) getBindingSite()
0210: .getServiceBroker().getService(this ,
0211: EventService.class, null);
0212:
0213: mySDFactory = (SDFactory) getFactory(SDDomain.SD_NAME);
0214:
0215: setOPCONSchedule(buildOPCONSchedule());
0216:
0217: setNeedToFindProviders(true);
0218:
0219: initializeNumberOfProvidersPerRole();
0220:
0221: if (didRehydrate()) {
0222: if (needToFindProviders()) {
0223: findProviders();
0224: }
0225: }
0226: }
0227:
0228: public void execute() {
0229: boolean OPCONScheduleChanged = false;
0230:
0231: if (myLineageSubscription.hasChanged()) {
0232: // handleChangedLineage calls wake() if there is an OPCON schedule change thereby
0233: // guaranteeing another execute() cycle after all of its changes to MMQueryRequests and
0234: // ServiceContractRelay have been committed.
0235: OPCONScheduleChanged = handleChangedLineage();
0236: }
0237:
0238: // If OPCONScheduleChanged, defer finding providers since
0239: // myServiceContractRelaySubscription will still contain any relays which were publish
0240: // removed in handleChangedLineage(). Wait for the subsequent execute() cycle triggered
0241: // by handleChangedLineage() call to wake().
0242: if (needToFindProviders() && !OPCONScheduleChanged) {
0243: if (myLoggingService.isDebugEnabled()) {
0244: myLoggingService.debug(getAgentIdentifier()
0245: + ": execute - needToFindProviders.");
0246: }
0247:
0248: findProviders();
0249: }
0250:
0251: if (myFindProvidersTaskSubscription.hasChanged()) {
0252: if (myLoggingService.isDebugEnabled()) {
0253: myLoggingService.debug("find providers changed, now "
0254: + myFindProvidersTaskSubscription.size()
0255: + " tasks");
0256: }
0257: updateFindProvidersTaskDispositions();
0258: }
0259:
0260: // If matchmaker has new possible providers, look at options,
0261: // and generate the relays possibly
0262: if (myMMRequestSubscription.hasChanged()) {
0263: generateServiceRequests(myMMRequestSubscription
0264: .getChangedCollection());
0265: }
0266:
0267: //if your relays have changed, check for revokes
0268: if (myServiceContractRelaySubscription.hasChanged()) {
0269: Collection changedRelays = myServiceContractRelaySubscription
0270: .getChangedCollection();
0271:
0272: handleChangedServiceContractRelays(changedRelays);
0273:
0274: // Update disposition on FindProviders task
0275: if (changedRelays.size() > 0) {
0276: if (myLoggingService.isDebugEnabled()) {
0277: myLoggingService.debug(getAgentIdentifier()
0278: + " changedRelays.size = "
0279: + changedRelays.size()
0280: + ", updateFindProvidersTaskDispositions");
0281: }
0282: updateFindProvidersTaskDispositions();
0283: }
0284:
0285: Collection removed = myServiceContractRelaySubscription
0286: .getRemovedCollection();
0287:
0288: for (Iterator removedIterator = removed.iterator(); removedIterator
0289: .hasNext();) {
0290: ServiceContract removedContract = ((ServiceContractRelay) removedIterator
0291: .next()).getServiceContract();
0292:
0293: if (removedContract != null) {
0294: TimeSpan timeSpan = SDFactory
0295: .getTimeSpanFromPreferences(removedContract
0296: .getServicePreferences());
0297:
0298: Role role = removedContract.getServiceRole();
0299: if (!checkProviderCompletelyRequested(role,
0300: timeSpan)) {
0301: queryServices(role, timeSpan);
0302: }
0303: }
0304: }
0305: }
0306:
0307: } // end of execute method
0308:
0309: protected IncrementalSubscription getMMRequestSubscription() {
0310: return myMMRequestSubscription;
0311: }
0312:
0313: /**
0314: * If a changed relay is revoked and you are the client, do a new
0315: * service query for this role
0316: */
0317: protected void handleChangedServiceContractRelays(
0318: Collection changedRelays) {
0319: for (Iterator iterator = changedRelays.iterator(); iterator
0320: .hasNext();) {
0321: ServiceContractRelay relay = (ServiceContractRelay) iterator
0322: .next();
0323:
0324: //you want to take action if you are the client agent
0325: //and the service contract was revoked
0326: if (relay.getServiceContract().isRevoked()
0327: && relay.getClient().equals(getSelfOrg())) {
0328: //do a new service query
0329: //What had the contract covered?
0330: TimeSpan timeSpan = SDFactory
0331: .getTimeSpanFromPreferences(relay
0332: .getServiceRequest()
0333: .getServicePreferences());
0334: queryServices(relay.getServiceContract()
0335: .getServiceRole(), timeSpan);
0336: //publishRemove(relay);
0337: }
0338: }
0339:
0340: }
0341:
0342: /**
0343: * create & publish a relay with service request to the provider specified in
0344: * the serviceDescription for the specified time interval.
0345: */
0346: protected void requestServiceContract(
0347: ServiceDescription serviceDescription, TimeSpan interval) {
0348:
0349: // RHB should this be depricated/removed?
0350: Role role = getRole(serviceDescription);
0351: String providerName = serviceDescription.getProviderName();
0352: requestServiceContract(role, providerName, interval);
0353: }
0354:
0355: protected void requestServiceContract(Role role,
0356: String providerName, TimeSpan interval) {
0357: if (role == null) {
0358: if (myLoggingService.isWarnEnabled()) {
0359: myLoggingService
0360: .warn(getAgentIdentifier()
0361: + ": error requesting service contract: a null role");
0362: }
0363: } else {
0364: ServiceRequest request = mySDFactory.newServiceRequest(
0365: getSelfOrg(), role, mySDFactory
0366: .createTimeSpanPreferences(interval));
0367:
0368: ServiceContractRelay relay = mySDFactory
0369: .newServiceContractRelay(MessageAddress
0370: .getMessageAddress(providerName), request);
0371: if (myLoggingService.isDebugEnabled()) {
0372: myLoggingService
0373: .debug(getAgentIdentifier()
0374: + ": requestServiceContract() publish relay to "
0375: + providerName + " asking for role: "
0376: + role + " from "
0377: + new Date(interval.getStartTime())
0378: + " to "
0379: + new Date(interval.getEndTime()));
0380: }
0381: publishAdd(relay);
0382: }
0383: }
0384:
0385: // Get the self org out of the subscription
0386: protected Asset getSelfOrg() {
0387: if (mySelfOrg == null) {
0388: for (Iterator iterator = mySelfOrgSubscription.iterator(); iterator
0389: .hasNext();) {
0390: mySelfOrg = (Asset) iterator.next();
0391: }
0392: }
0393:
0394: return mySelfOrg;
0395: }
0396:
0397: protected String getMinimumEchelon(Role role) {
0398: Collection collection = getBlackboardService().query(
0399: myProviderCapabilitiesPred);
0400:
0401: ProviderCapabilities capabilities = null;
0402: if (collection.size() > 0) {
0403: Iterator iterator = collection.iterator();
0404: capabilities = (ProviderCapabilities) iterator.next();
0405: if (iterator.hasNext()) {
0406: myLoggingService
0407: .warn(getAgentIdentifier()
0408: + " getMinimumEchelon: multiple ProviderCapabilities found."
0409: + " Using - " + capabilities);
0410: }
0411: }
0412:
0413: return LineageEchelonScorer.getMinimumEchelonOfSupport(
0414: capabilities, role);
0415: }
0416:
0417: protected Lineage getCommandLineage(TimeSpan timeSpan) {
0418: Collection matches = getOPCONSchedule().intersectingSet(
0419: timeSpan);
0420:
0421: Lineage commandLineage = null;
0422:
0423: switch (matches.size()) {
0424:
0425: case 0:
0426: break;
0427:
0428: case 1:
0429: LineageTimeSpan lineageTimeSpan = (LineageTimeSpan) (matches
0430: .iterator().next());
0431: commandLineage = lineageTimeSpan.getLineage();
0432: break;
0433:
0434: default:
0435: if (myLoggingService.isDebugEnabled()) {
0436: myLoggingService.debug(getAgentIdentifier()
0437: + " getCommandLineage: OPCON schedule has "
0438: + matches.size() + " " + matches
0439: + " elements overlapping " + timeSpan);
0440: }
0441: break;
0442: }
0443:
0444: if (myLoggingService.isDebugEnabled()) {
0445: myLoggingService.debug(getAgentIdentifier()
0446: + ": getCommandLineage: returning - "
0447: + commandLineage + " for " + timeSpan);
0448: }
0449:
0450: return commandLineage;
0451: }
0452:
0453: protected TimeSpan getOPCONTimeSpan() {
0454: if ((getOPCONSchedule().first() != null)
0455: && (getOPCONSchedule().last() != null)) {
0456: TimeSpan opconTimeSpan = TimeSpans
0457: .getSpan(((TimeSpan) getOPCONSchedule().first())
0458: .getStartTime(),
0459: ((TimeSpan) getOPCONSchedule().last())
0460: .getEndTime());
0461: return opconTimeSpan;
0462: } else {
0463: return SDFactory.DEFAULT_TIME_SPAN;
0464: }
0465: }
0466:
0467: protected void queryServices(Role role) {
0468: queryServices(role, getOPCONTimeSpan());
0469: }
0470:
0471: /**
0472: * create and publish a MMQueryRequest for this role
0473: */
0474: protected void queryServices(Role role, TimeSpan timeSpan) {
0475: if (myLoggingService.isDebugEnabled()) {
0476: myLoggingService.debug(getAgentIdentifier()
0477: + ": queryServices() role = " + role
0478: + " timeSpan = "
0479: + new Date(timeSpan.getStartTime()) + " to "
0480: + new Date(timeSpan.getEndTime()));
0481: }
0482:
0483: String minimumEchelon = getMinimumEchelon(role);
0484:
0485: Collection opconLineages = getOPCONSchedule().intersectingSet(
0486: timeSpan);
0487:
0488: if (opconLineages.size() == 0) {
0489: if (myLoggingService.isDebugEnabled()) {
0490: myLoggingService
0491: .debug(getAgentIdentifier()
0492: + ": queryServices: no OPCON Lineage on blackboard"
0493: + "for requested time span - "
0494: + timeSpan.getStartTime()
0495: + " to "
0496: + timeSpan.getEndTime()
0497: + ". Making single element OPCON lineage.");
0498: }
0499: // Build a 1 node opcon list
0500: ArrayList list = new ArrayList();
0501: list.add(getAgentIdentifier().toString());
0502:
0503: LineageTimeSpan lineageTimeSpan = new LineageTimeSpan(
0504: mySDFactory.newLineage(Lineage.OPCON, list,
0505: SDFactory.DEFAULT_TIME_SPAN),
0506: SDFactory.DEFAULT_TIME_SPAN);
0507: opconLineages = new ArrayList();
0508: opconLineages.add(lineageTimeSpan);
0509: }
0510:
0511: if (myLoggingService.isDebugEnabled()) {
0512: myLoggingService.debug(getAgentIdentifier()
0513: + ": queryServices() OPCON schedule = "
0514: + getOPCONSchedule()
0515: + " OPCON schedule intersectingSet = "
0516: + opconLineages);
0517: }
0518:
0519: for (Iterator iterator = opconLineages.iterator(); iterator
0520: .hasNext();) {
0521: LineageTimeSpan opconTimeSpan = (LineageTimeSpan) iterator
0522: .next();
0523:
0524: LineageEchelonScorer scorer = new LineageEchelonScorer(
0525: opconTimeSpan.getLineage(), minimumEchelon, role);
0526: queryServices(role, scorer, opconTimeSpan);
0527: }
0528: }
0529:
0530: /**
0531: * create and publish a MMQueryRequest for this role
0532: */
0533: protected void queryServices(Role role, ServiceInfoScorer scorer,
0534: TimeSpan timeSpan) {
0535: boolean outstandingQuery = false;
0536:
0537: // Check to make sure we don't already have an outstanding request for this time span
0538: for (Iterator queryIterator = myMMRequestSubscription
0539: .iterator(); queryIterator.hasNext();) {
0540: MMQueryRequest request = (MMQueryRequest) queryIterator
0541: .next();
0542:
0543: if (matchingRequest(request, role, timeSpan, scorer)) {
0544: outstandingQuery = true;
0545:
0546: if (myLoggingService.isDebugEnabled()) {
0547: myLoggingService
0548: .debug(getAgentIdentifier()
0549: + " ignoring call to ask for MatchMaker for role : "
0550: + role + " - serviceInfoScorer : "
0551: + scorer + " - timeSpan : "
0552: + timeSpan
0553: + ". Outstanding MMQuery - "
0554: + request.getQuery()
0555: + " - already exists.");
0556: }
0557: break;
0558: }
0559: }
0560:
0561: if (!outstandingQuery) {
0562: MMQueryRequest mmRequest = mySDFactory
0563: .newMMQueryRequest(new MMRoleQuery(role, scorer,
0564: timeSpan));
0565: publishAdd(mmRequest);
0566:
0567: if (myLoggingService.isDebugEnabled()) {
0568: myLoggingService.debug(getAgentIdentifier()
0569: + " asking MatchMaker for role : " + role
0570: + " - serviceInfoScorer : " + scorer
0571: + " - timeSpan : " + timeSpan
0572: + " - MMQueryRequest : " + mmRequest.getUID());
0573: }
0574: }
0575: }
0576:
0577: protected void setNeedToFindProviders(boolean flag) {
0578: myNeedToFindProviders = flag;
0579: }
0580:
0581: // If we have not yet created the service requests, and we're
0582: // in an OPLAN stage where we should do work, return true
0583: protected boolean needToFindProviders() {
0584: return ((myNeedToFindProviders) && (!myFindProvidersTaskSubscription
0585: .isEmpty()));
0586: }
0587:
0588: // For each role in parameters, generate a MMQueryRequest
0589: protected void findProviders() {
0590: Collection roleParams = parseRoleParams();
0591:
0592: for (Iterator iterator = roleParams.iterator(); iterator
0593: .hasNext();) {
0594: Role role = Role.getRole((String) iterator.next());
0595:
0596: if (!checkProviderCompletelyRequested(role,
0597: getOPCONTimeSpan())) {
0598: queryServices(role);
0599:
0600: }
0601: }
0602: setNeedToFindProviders(false);
0603: }
0604:
0605: protected Collection parseRoleParams() {
0606: Collection params = getDelegate().getParameters();
0607: ArrayList roleparams = new ArrayList(1);
0608:
0609: for (Iterator iterator = params.iterator(); iterator.hasNext();) {
0610: String fullParam = (String) iterator.next();
0611: if (fullParam.indexOf(":") > 0) {
0612: roleparams.add(fullParam.substring(0, fullParam
0613: .indexOf(":")));
0614: } else {
0615: roleparams.add(fullParam);
0616: }
0617: }
0618: return roleparams;
0619: }
0620:
0621: protected void initializeNumberOfProvidersPerRole() {
0622: Collection params = getDelegate().getParameters();
0623:
0624: for (Iterator iterator = params.iterator(); iterator.hasNext();) {
0625: String fullParam = (String) iterator.next();
0626: int endRoleIndex;
0627: if (fullParam.indexOf(":") > 0) {
0628: endRoleIndex = fullParam.indexOf(":");
0629: Role desiredRole = Role.getRole(fullParam.substring(0,
0630: endRoleIndex));
0631: String numProviders = fullParam.substring(
0632: endRoleIndex + 1, fullParam.length());
0633: if (myLoggingService.isInfoEnabled()) {
0634: myLoggingService.info(getAgentIdentifier()
0635: + " numProviders desired for role "
0636: + desiredRole + " is " + numProviders);
0637: }
0638: Integer i = new Integer(numProviders);
0639: if (i != null) {
0640: myRoles.put(desiredRole, i);
0641: }
0642: }
0643: }
0644: }
0645:
0646: /**
0647: * For each answered request, pick an appropriate provider
0648: * and send them a service request.
0649: * Note that input is the changed service requests only.
0650: * Also need HashMap of number of providers desired for each role.
0651: */
0652: protected void generateServiceRequests(Collection mmRequests) {
0653: for (Iterator iterator = mmRequests.iterator(); iterator
0654: .hasNext();) {
0655: MMQueryRequest mmRequest = (MMQueryRequest) iterator.next();
0656: MMRoleQuery query = (MMRoleQuery) mmRequest.getQuery();
0657:
0658: if (myLoggingService.isDebugEnabled()) {
0659: myLoggingService.debug(getAgentIdentifier()
0660: + ": generateServiceRequests() MMQueryRequest "
0661: + " has changed: " + mmRequest.getUID()
0662: + " query = " + query);
0663: }
0664:
0665: if (query.getObsolete()) {
0666: if (myLoggingService.isDebugEnabled()) {
0667: myLoggingService
0668: .debug(getAgentIdentifier()
0669: + ": generateServiceRequests() ignoring obsolete request - "
0670: + mmRequest.getUID());
0671: }
0672: continue;
0673: }
0674:
0675: // Only do anything if the query has a result
0676: Collection services = mmRequest.getResult();
0677:
0678: if ((services == null) || (services.size() == 0)) {
0679: // MMPlugin said no one matched?
0680: if (System.currentTimeMillis() > getWarningCutoffTime()) {
0681: myLoggingService
0682: .error(getAgentIdentifier()
0683: + ": generateServiceRequests() got 0 results"
0684: + " for query - " + query);
0685: } else if (myLoggingService.isDebugEnabled()) {
0686: myLoggingService
0687: .debug(getAgentIdentifier()
0688: + ": generateServiceRequests() got 0 results"
0689: + " for query - " + query);
0690: }
0691: } else {
0692: Role role = query.getRole();
0693: TimeSpan timeSpan = query.getTimeSpan();
0694:
0695: if (myLoggingService.isDebugEnabled()) {
0696: myLoggingService
0697: .debug(getAgentIdentifier()
0698: + ": generateServiceRequests() results for "
0699: + "query - " + query
0700: + " for role = " + role
0701: + ", time span = "
0702: + query.getTimeSpan()
0703: + " - number of avail providers - "
0704: + services.size());
0705: }
0706:
0707: Collection intervals = getCurrentlyUncoveredIntervalsWithoutOutstandingRequests(
0708: timeSpan.getStartTime(), timeSpan.getEndTime(),
0709: role);
0710: if (intervals.size() == 0) {
0711: if (myLoggingService.isDebugEnabled()) {
0712: myLoggingService
0713: .debug(getAgentIdentifier()
0714: + ": generateServiceRequests() no "
0715: + " uncovered time periods for - "
0716: + role
0717: + ". Will not generate a service request.");
0718: }
0719: continue; // on to the next changed MMRequest
0720: }
0721:
0722: int desiredNumberOfProviders = getDesiredNumberOfProviders(role);
0723:
0724: //make sure any ties are resolved in a reliable & consistent order
0725: Collection servicesList = reorderAnyTiedServiceDescriptions(new ArrayList(
0726: services));
0727:
0728: //now, for each interval, pick a provider (service description) and
0729: //request a contract
0730: for (Iterator neededIntervals = intervals.iterator(); neededIntervals
0731: .hasNext();) {
0732: TimeSpan currentInterval = (TimeSpan) neededIntervals
0733: .next();
0734:
0735: boolean madeNewRequest = false;
0736: int numProviderFound = 0;
0737:
0738: for (Iterator serviceIterator = servicesList
0739: .iterator(); serviceIterator.hasNext()
0740: && numProviderFound < desiredNumberOfProviders;) {
0741: ScoredServiceDescription sd = (ScoredServiceDescription) serviceIterator
0742: .next();
0743: //if you have already asked this provider, skip it
0744: if (alreadyAskedForContractWithProvider(role,
0745: sd.getProviderName(), currentInterval)) {
0746: if (myLoggingService.isDebugEnabled()) {
0747: myLoggingService
0748: .debug(getAgentIdentifier()
0749: + " skipping "
0750: + sd.getProviderName()
0751: + " for role: " + role);
0752: }
0753: } else {
0754: //remember that you found a provider to request from
0755: madeNewRequest = true;
0756: numProviderFound++;
0757: //do the request
0758: if (myLoggingService.isDebugEnabled()) {
0759: myLoggingService
0760: .debug(getAgentIdentifier()
0761: + " requesting contract with "
0762: + sd.getProviderName()
0763: + " for role - " + role
0764: + ", time - "
0765: + currentInterval);
0766: }
0767: requestServiceContract(sd, currentInterval);
0768: }
0769: } // end of for loop for number of desired providers
0770:
0771: //if you were not able to find a provider to request from
0772: //for this interval, take appropriate action
0773: if (!madeNewRequest) {
0774: handleRequestWithNoRemainingProviderOption(
0775: role, currentInterval);
0776: }
0777: }// end of for loop over uncovered intervals
0778: } // end of loop over MMRequests
0779: }
0780: }
0781:
0782: /**
0783: * Modify the order of scoredServiceDescriptions so that ties are in the
0784: * order you want them to be
0785: */
0786: protected Collection reorderAnyTiedServiceDescriptions(
0787: ArrayList scoredServiceDescriptions) {
0788: return scoredServiceDescriptions;
0789: //do nothing, trust the matchmaker ordering
0790: }
0791:
0792: /**
0793: * return true if you already have a service contract relay with this
0794: * provider
0795: */
0796: protected boolean alreadyAskedForContractWithProvider(Role role,
0797: String providerName, TimeSpan timeSpan) {
0798: for (Iterator relayIterator = myServiceContractRelaySubscription
0799: .iterator(); relayIterator.hasNext();) {
0800: ServiceContractRelay relay = (ServiceContractRelay) relayIterator
0801: .next();
0802: if (relay.getProviderName().equals(providerName)
0803: && relay.getClient().equals(getSelfOrg())
0804: && relay.getServiceRequest().getServiceRole()
0805: .equals(role)) {
0806: // Did we ask for the same time period?
0807: TimeSpan requestedTimeSpan = SDFactory
0808: .getTimeSpanFromPreferences(relay
0809: .getServiceRequest()
0810: .getServicePreferences());
0811: return (requestedTimeSpan.equals(timeSpan));
0812: }
0813: }
0814: return false;
0815: }
0816:
0817: /**
0818: * Log a warning that you couldn't find a provider for this option
0819: */
0820: protected void handleRequestWithNoRemainingProviderOption(
0821: Role role, TimeSpan currentInterval) {
0822: //this means you have a time interval where you have exhausted all possible
0823: //providers. Log a warning.
0824: if (myLoggingService.isWarnEnabled()) {
0825: myLoggingService
0826: .warn(getAgentIdentifier()
0827: + " failed to get contract for "
0828: + role
0829: + " for time period from "
0830: + new java.util.Date(currentInterval
0831: .getStartTime())
0832: + " to "
0833: + new java.util.Date(currentInterval
0834: .getEndTime()));
0835: }
0836: }
0837:
0838: /**
0839: * For this role, check if there is a non-revoked service contract or an
0840: * unanswered request. If so, return an empty collection. If not, return
0841: * a collection containing the time interval between desiredStart and
0842: * desiredEnd.
0843: */
0844: protected Collection getCurrentlyUncoveredIntervalsWithoutOutstandingRequests(
0845: long desiredStart, long desiredEnd, Role role) {
0846:
0847: ArrayList ret = new ArrayList();
0848: TimeSpan timeSpan = TimeSpans.getSpan(desiredStart, desiredEnd);
0849: if (!checkProviderCompletelyRequested(role, timeSpan)) {
0850: ret.add(timeSpan);
0851: }
0852: return ret;
0853: }
0854:
0855: /**
0856: * return true if you have a non-revoked service contract for
0857: * this role or if you have an unanswered request
0858: */
0859: protected boolean checkProviderCompletelyRequested(Role role,
0860: TimeSpan timeSpan) {
0861: if (timeSpan == null) {
0862: return false;
0863: }
0864:
0865: return requested(role, timeSpan);
0866: }
0867:
0868: /**
0869: * return true if you have a non-revoked service contract for
0870: * this role or if you have an unanswered request
0871: */
0872: protected boolean checkProviderCompletelyCovered(Role role,
0873: TimeSpan timeSpan) {
0874: if (timeSpan == null) {
0875: return false;
0876: }
0877:
0878: return covered(role, timeSpan);
0879: }
0880:
0881: protected boolean covered(Role role, TimeSpan timeSpan) {
0882:
0883: TimeSpanSet contractTimeSpanSet = new TimeSpanSet();
0884:
0885: for (Iterator relayIterator = myServiceContractRelaySubscription
0886: .iterator(); relayIterator.hasNext();) {
0887: ServiceContractRelay relay = (ServiceContractRelay) relayIterator
0888: .next();
0889:
0890: ServiceContract contract = relay.getServiceContract();
0891: if ((contract != null) && (!contract.isRevoked())
0892: && (relay.getClient().equals(getSelfOrg()))
0893: && (contract.getServiceRole().equals(role))) {
0894: TimeSpan contractTimeSpan = SDFactory
0895: .getTimeSpanFromPreferences(contract
0896: .getServicePreferences());
0897: contractTimeSpanSet.add(contractTimeSpan);
0898: } else if (myLoggingService.isDebugEnabled()) {
0899: if (contract == null) {
0900: myLoggingService
0901: .warn("\tskipping contract for relay "
0902: + relay + " b/c it's null");
0903: } else if (contract.isRevoked()) {
0904: myLoggingService.warn("\tskipping contract "
0905: + contract + " b/c revoked");
0906: } else if (!relay.getClient().equals(getSelfOrg())) {
0907: myLoggingService.warn("\tskipping contract "
0908: + contract + " b/c client "
0909: + relay.getClient() + " not self");
0910: } else if (!contract.getServiceRole().equals(role)) {
0911: myLoggingService.warn("\tskipping contract "
0912: + contract + " b/c client "
0913: + contract.getServiceRole() + " not "
0914: + role);
0915: }
0916: }
0917: }
0918:
0919: boolean continuous = continuousCoverage(timeSpan,
0920: contractTimeSpanSet);
0921:
0922: if (!continuous && myLoggingService.isInfoEnabled()) {
0923: StringBuffer buf = new StringBuffer();
0924: buf.append("Time span not continuous :\n");
0925: for (int i = 0; i < contractTimeSpanSet.size(); i++) {
0926: TimeSpan span = (TimeSpan) contractTimeSpanSet.get(i);
0927: buf.append(i);
0928: buf.append(" ");
0929: buf.append(span);
0930: buf.append("\n");
0931: }
0932: myLoggingService.info(buf.toString());
0933: }
0934:
0935: return continuous;
0936:
0937: }
0938:
0939: protected boolean requested(Role role, TimeSpan timeSpan) {
0940:
0941: TimeSpanSet requestTimeSpanSet = new TimeSpanSet();
0942:
0943: for (Iterator relayIterator = myServiceContractRelaySubscription
0944: .iterator(); relayIterator.hasNext();) {
0945: ServiceContractRelay relay = (ServiceContractRelay) relayIterator
0946: .next();
0947:
0948: ServiceRequest request = relay.getServiceRequest();
0949:
0950: if (relay.getClient().equals(getSelfOrg())
0951: && request.getServiceRole().equals(role)) {
0952: TimeSpan requestTimeSpan = SDFactory
0953: .getTimeSpanFromPreferences(request
0954: .getServicePreferences());
0955: requestTimeSpanSet.add(requestTimeSpan);
0956: }
0957: }
0958:
0959: return continuousCoverage(timeSpan, requestTimeSpanSet);
0960: }
0961:
0962: protected boolean continuousCoverage(TimeSpan targetTimeSpan,
0963: TimeSpanSet timeSpanSet) {
0964: if (timeSpanSet.isEmpty()) {
0965: return false;
0966: }
0967:
0968: long currentEarliest = -1;
0969: long currentLatest = -1;
0970: for (Iterator timeSpanIterator = timeSpanSet.iterator(); timeSpanIterator
0971: .hasNext();) {
0972: TimeSpan timeSpan = (TimeSpan) timeSpanIterator.next();
0973: if (currentEarliest == -1) {
0974: currentEarliest = timeSpan.getStartTime();
0975: } else {
0976: currentEarliest = Math.min(currentEarliest, timeSpan
0977: .getStartTime());
0978: }
0979:
0980: if (currentLatest == -1) {
0981: currentLatest = timeSpan.getEndTime();
0982: } else {
0983: if (currentLatest < timeSpan.getStartTime()) {
0984: // Missing coverage
0985: if (myLoggingService.isDebugEnabled()) {
0986: myLoggingService
0987: .debug(getAgentIdentifier()
0988: + ": continuousCoverage() returning false for timeSpan = "
0989: + new Date(timeSpan
0990: .getStartTime())
0991: + " to "
0992: + new Date(timeSpan
0993: .getEndTime())
0994: + ". Gap in coverage detected at "
0995: + new Date(currentLatest));
0996: }
0997: return false;
0998: } else {
0999: currentLatest = Math.max(currentLatest, timeSpan
1000: .getEndTime());
1001: }
1002: }
1003: }
1004:
1005: return (currentEarliest <= targetTimeSpan.getStartTime() && currentLatest >= targetTimeSpan
1006: .getEndTime());
1007: }
1008:
1009: protected boolean handleChangedLineage() {
1010:
1011: NonOverlappingTimeSpanSet currentOPCONSchedule = buildOPCONSchedule();
1012:
1013: if (currentOPCONSchedule.isEmpty()) {
1014: if (myLoggingService.isDebugEnabled()) {
1015: myLoggingService
1016: .debug(getAgentIdentifier()
1017: + ": handleChangedLineage() "
1018: + " unable to build OPCON schedule from the current lineages.");
1019: }
1020:
1021: // Don't replace the existing OPCON schedule. Wait until we have consistent OPCON
1022: // info. Assumption is that LineagePlugin is modifying lineages and will trigger
1023: // another round of processing via a publish add/change/remove.
1024: return false;
1025: }
1026:
1027: if (currentOPCONSchedule.equals(getOPCONSchedule())) {
1028: if (myLoggingService.isDebugEnabled()) {
1029: myLoggingService
1030: .debug(getAgentIdentifier()
1031: + ": handleChangedLineage() "
1032: + " change in lineage subscription - no change in OPCON schedule. "
1033: + " currentOPCONSchedule = "
1034: + currentOPCONSchedule
1035: + " previous OPCON schedule = "
1036: + getOPCONSchedule());
1037: }
1038:
1039: return false;
1040: } else {
1041: if (myLoggingService.isDebugEnabled()) {
1042: myLoggingService.debug(getAgentIdentifier()
1043: + ": handleChangedLineage() "
1044: + "change in OPCON schedule - "
1045: + " currentOPCONSchedule = "
1046: + currentOPCONSchedule
1047: + " previous OPCON schedule = "
1048: + getOPCONSchedule());
1049: }
1050:
1051: verifyOutstandingRequests(currentOPCONSchedule);
1052: verifyServiceContracts(currentOPCONSchedule);
1053:
1054: setOPCONSchedule(currentOPCONSchedule);
1055:
1056: // Ping execute so that we query for missing services
1057: setNeedToFindProviders(true);
1058: wake();
1059: return true;
1060: }
1061: }
1062:
1063: // Log helper: Where have we gotten to?
1064: private String stateMessage() {
1065: String message = "State of plugin - ";
1066:
1067: //what is the status of the find providers task
1068: for (Iterator iterator = myFindProvidersTaskSubscription
1069: .iterator(); iterator.hasNext();) {
1070: Task task = (Task) iterator.next();
1071: PlanElement pe = task.getPlanElement();
1072: if ((pe != null) && (pe.getEstimatedResult() != null)) {
1073: message = message
1074: .concat("\n FindProviders conf: "
1075: + pe.getEstimatedResult()
1076: .getConfidenceRating());
1077: } else {
1078: message = message
1079: .concat("\n FindProviders task has no result yet");
1080: }
1081: }
1082:
1083: //which roles do we need?
1084: message = message.concat("\n The roles needed are: ");
1085: Collection roleparams = parseRoleParams();
1086: for (Iterator iterator = roleparams.iterator(); iterator
1087: .hasNext();) {
1088: Role role = Role.getRole((String) iterator.next());
1089: message = message.concat(role + " ");
1090: }
1091:
1092: //what service contracts relays do we have, and which have contracts
1093: // (replies)?
1094: for (Iterator relayIterator = myServiceContractRelaySubscription
1095: .iterator(); relayIterator.hasNext();) {
1096: ServiceContractRelay relay = (ServiceContractRelay) relayIterator
1097: .next();
1098:
1099: if (relay.getClient().equals(getSelfOrg())) {
1100: message = message
1101: .concat("\n Sent a service contract relay to "
1102: + relay.getProviderName()
1103: + " for the role "
1104: + relay.getServiceRequest()
1105: .getServiceRole());
1106: }
1107: if (relay.getServiceContract() != null
1108: && relay.getClient().equals(getSelfOrg())) {
1109: message = message
1110: .concat(", and the provider has answered.");
1111: } else if (relay.getServiceContract() == null
1112: && relay.getClient().equals(getSelfOrg())) {
1113: message = message
1114: .concat(", but no answer yet from the provider.");
1115: }
1116: }
1117:
1118: // Now look to see what roles we have asked the MM for
1119: for (Iterator queryIterator = myMMRequestSubscription
1120: .iterator(); queryIterator.hasNext();) {
1121: MMQueryRequest request = (MMQueryRequest) queryIterator
1122: .next();
1123:
1124: if (request.getQuery() instanceof MMRoleQuery) {
1125: message = message
1126: .concat("\n MMQueryRequest exists for "
1127: + ((MMRoleQuery) request.getQuery())
1128: .getRole());
1129: } else {
1130: message = message
1131: .concat("\n MMQueryRequest non-role based query ");
1132: }
1133:
1134: if (request.getResult() == null) {
1135: message = message.concat(", but no reply.");
1136: } else {
1137: message = message.concat(", and reply exists with "
1138: + request.getResult().size()
1139: + " possible providers.");
1140: }
1141: }
1142: return message;
1143: }
1144:
1145: // See if we have all contracts for all the roles we need. If so, set
1146: // FindProviders conf to 1. Otherwise, to 0.
1147: private void updateFindProvidersTaskDispositions() {
1148: // Print verbose status of all requests
1149: if (myLoggingService.isInfoEnabled()) {
1150: myLoggingService.info(getAgentIdentifier()
1151: + ": updateFindProvidersTaskDispositions() "
1152: + stateMessage());
1153: }
1154:
1155: if (myFindProvidersTaskSubscription.isEmpty()) {
1156: // Nothing to update
1157: return;
1158: }
1159: double conf = getConfidenceForFindProviders();
1160:
1161: // Did we just find our providers? This will signal
1162: // whether the agent should persiste
1163: boolean justFoundProviders = false;
1164:
1165: //Only request persist if in initial FindProviders stage
1166: boolean doingInitialFindProviders = true;
1167:
1168: if (myFindProvidersTaskSubscription.isEmpty()) {
1169: myLoggingService.warn(getAgentIdentifier()
1170: + " huh? no find providers task yet?");
1171: }
1172: // Now update the confidence on the FindProviders task
1173: for (Iterator iterator = myFindProvidersTaskSubscription
1174: .iterator(); iterator.hasNext();) {
1175: Task task = (Task) iterator.next();
1176: Set oStages = getOplanStages(task);
1177: if (oStages == null || oStages.size() != 1)
1178: doingInitialFindProviders = false;
1179:
1180: PlanElement pe = task.getPlanElement();
1181:
1182: AllocationResult estResult = PluginHelper
1183: .createEstimatedAllocationResult(task, theLDMF,
1184: conf, true);
1185: if (pe == null) {
1186: Disposition disposition = theLDMF.createDisposition(
1187: task.getPlan(), task, estResult);
1188: if (myLoggingService.isInfoEnabled()) {
1189: myLoggingService
1190: .info(getAgentIdentifier()
1191: + ": updateFindProvidersTaskDispositions: create disposition with conf "
1192: + conf);
1193: }
1194: publishAdd(disposition);
1195:
1196: // Are we in the first findProviders stage? Then if conf = 1,
1197: // justFoundProviders
1198: // needToFindProviders() says we've received the FindProvider task.
1199: // but I don't want to persist with every oplan stage change
1200: if ((PERSIST_EARLY) && (doingInitialFindProviders)
1201: && (conf == 1.0)) {
1202: justFoundProviders = true;
1203: if (myLoggingService.isInfoEnabled()) {
1204: myLoggingService
1205: .info(getAgentIdentifier()
1206: + ": updateFindProvidersTaskDispositions - "
1207: + " added a conf 1.0 disposition to findProviders while doingInitialFindProviders - going to request persistence.");
1208: }
1209: }
1210: } else {
1211: if (conf != pe.getEstimatedResult()
1212: .getConfidenceRating()) {
1213: if (myLoggingService.isInfoEnabled()) {
1214: myLoggingService
1215: .info(getAgentIdentifier()
1216: + ": updateFindProvidersTaskDispositions() changed conf from "
1217: + pe.getEstimatedResult()
1218: .getConfidenceRating()
1219: + " to " + conf + " for "
1220: + task.getUID());
1221: } // end if logging block
1222:
1223: pe.setEstimatedResult(estResult);
1224: publishChange(pe);
1225:
1226: // if conf = 1.0 and we're in the first findProviders stage
1227: // then justFoundProviders
1228: // needToFindProviders says this is at least the findProviders,
1229: // but I don't want to persist with every oplan stage change
1230: if ((PERSIST_EARLY) && (doingInitialFindProviders)
1231: && (conf == 1.0)) {
1232: justFoundProviders = true;
1233: if (myLoggingService.isInfoEnabled())
1234: myLoggingService
1235: .info(getAgentIdentifier()
1236: + " just changed findProviders dispo to 1.0 while doingInitialFindProviders - going to request persistence.");
1237: }
1238:
1239: } else if (myLoggingService.isInfoEnabled()) {
1240: myLoggingService
1241: .info("not updating ER confidence (" + conf
1242: + ") for find providers task "
1243: + task.getUID());
1244: }
1245: } // end of block to change PE conf
1246: } // end of loop over FindProviders tasks
1247:
1248: if (conf == 1.0) {
1249: resetWarningCutoffTime();
1250: }
1251:
1252: handlePersistEarly(justFoundProviders);
1253:
1254: } // end of method
1255:
1256: /**
1257: * Planning time defaults to 1970-2500 A.D.
1258: *
1259: * @return 1.0 if the contracts completely cover all of planning time, 0.0 otherwise
1260: */
1261: private double getConfidenceForFindProviders() {
1262: double conf = 1.0;
1263:
1264: // First determine whether we have found all our providers
1265:
1266: // Look to see if we've got all our contracts for roles
1267: // specified as plugin parameters. If any one is missing,
1268: // then overall confidence stays 0
1269: Collection roleParams = parseRoleParams();
1270:
1271: if (myLoggingService.isInfoEnabled()) {
1272: myLoggingService
1273: .info(getAgentIdentifier()
1274: + ": getConfidenceForFindProviders: contracts found - ");
1275: }
1276:
1277: for (Iterator iterator = roleParams.iterator(); iterator
1278: .hasNext();) {
1279: Role role = Role.getRole((String) iterator.next());
1280: boolean foundContract = checkProviderCompletelyCovered(
1281: role, getOPCONTimeSpan());
1282:
1283: if (!foundContract) {
1284: if (myLoggingService.isInfoEnabled()) {
1285: myLoggingService.info(" no contract yet for role "
1286: + role + " over time " + getOPCONTimeSpan()
1287: + ". Contracts were :\n"
1288: + reportOnContracts());
1289: }
1290:
1291: conf = 0.0;
1292: break;
1293: } else {
1294: if (myLoggingService.isInfoEnabled()) {
1295: myLoggingService.info(getAgentIdentifier()
1296: + " found contract for role " + role);
1297: }
1298: }
1299: // end of block where found no contract for a Role
1300: } // end of loop over Roles passed in as parameters
1301: return conf;
1302: }
1303:
1304: private String reportOnContracts() {
1305: StringBuffer buf = new StringBuffer();
1306: int i = 0;
1307: for (Iterator relayIterator = myServiceContractRelaySubscription
1308: .iterator(); relayIterator.hasNext();) {
1309: ServiceContractRelay relay = (ServiceContractRelay) relayIterator
1310: .next();
1311: ServiceContract contract = relay.getServiceContract();
1312: buf.append("\t");
1313: buf.append(i++);
1314: buf.append(" : ");
1315: buf
1316: .append((contract == null ? " no contract for relay "
1317: + relay
1318: : contract.getServiceRole()
1319: + " by "
1320: + contract.getProvider()
1321: + "[ "
1322: + getPrefs(contract
1323: .getServicePreferences())
1324: + " ]"));
1325:
1326: buf.append("\n");
1327: }
1328: return buf.toString();
1329: }
1330:
1331: String getPrefs(Collection prefs) {
1332: String result = "";
1333: for (Iterator iterator = prefs.iterator(); iterator.hasNext();) {
1334: Preference preference = (Preference) iterator.next();
1335: result += preference.getAspectType() == 0 ? "START" : "END";
1336: result += " "
1337: + dateString(new Date((long) preference
1338: .getScoringFunction().getBest().getValue()));
1339: }
1340: return result;
1341: }
1342:
1343: public String dateString(Date date) {
1344: SimpleDateFormat dateTimeFormat_ = new SimpleDateFormat(
1345: "MM/dd/yyyy HH:mm:ss.SSS z");
1346: String sdate = dateTimeFormat_.format(date);
1347: // mape '9/8/00 12:00 AM' to ' 9/8/00 12:00 AM'
1348: while (sdate.length() < 17) {
1349: sdate = " " + sdate;
1350: }
1351: return sdate;
1352: }
1353:
1354: protected Role getRole(ServiceDescription serviceDescription) {
1355:
1356: for (Iterator iterator = serviceDescription
1357: .getServiceClassifications().iterator(); iterator
1358: .hasNext();) {
1359: ServiceClassification serviceClassification = (ServiceClassification) iterator
1360: .next();
1361: if (serviceClassification.getClassificationSchemeName()
1362: .equals(UDDIConstants.MILITARY_SERVICE_SCHEME)) {
1363: Role role = Role.getRole(serviceClassification
1364: .getClassificationName());
1365: return role;
1366: }
1367: }
1368: return null;
1369:
1370: }
1371:
1372: protected Set getOplanStages(Task task) {
1373: Enumeration origpp = task.getPrepositionalPhrases();
1374:
1375: while (origpp.hasMoreElements()) {
1376: PrepositionalPhrase app = (PrepositionalPhrase) origpp
1377: .nextElement();
1378: if (app.getPreposition().equals(
1379: Constants.Prepositions.FOR_OPLAN_STAGES)) {
1380: return (Set) app.getIndirectObject();
1381: }
1382: }
1383:
1384: return Collections.EMPTY_SET;
1385: }
1386:
1387: protected long getWarningCutoffTime() {
1388: if (myWarningCutoffTime == -1) {
1389: WARNING_SUPPRESSION_INTERVAL = Integer.getInteger(
1390: CLIENT_GRACE_PERIOD, WARNING_SUPPRESSION_INTERVAL)
1391: .intValue();
1392: myWarningCutoffTime = System.currentTimeMillis()
1393: + (WARNING_SUPPRESSION_INTERVAL * 60000);
1394: }
1395:
1396: return myWarningCutoffTime;
1397: }
1398:
1399: protected void resetWarningCutoffTime() {
1400: myWarningCutoffTime = -1;
1401: }
1402:
1403: protected int getDesiredNumberOfProviders(Role role) {
1404: Integer desiredProviders = ((Integer) myRoles.get(role));
1405: if (desiredProviders != null) {
1406: return desiredProviders.intValue();
1407: } else {
1408: return 1;
1409: }
1410: }
1411:
1412: protected NonOverlappingTimeSpanSet getOPCONSchedule() {
1413: return myOPCONSchedule;
1414: }
1415:
1416: /**
1417: * @see org.cougaar.servicediscovery.plugin.SDClientPlugin#setupSubscriptions()
1418: * @see SDClientPlugin#handleChangedLineage()
1419: * @param opconSchedule
1420: */
1421: protected void setOPCONSchedule(
1422: NonOverlappingTimeSpanSet opconSchedule) {
1423: myOPCONSchedule = opconSchedule;
1424: }
1425:
1426: /**
1427: * @see #setupSubscriptions()
1428: * @see #handleChangedLineage
1429: * @return TimeSpanSet
1430: */
1431: protected NonOverlappingTimeSpanSet buildOPCONSchedule() {
1432: NonOverlappingTimeSpanSet opconSchedule = new NonOverlappingTimeSpanSet();
1433:
1434: for (Iterator iterator = myLineageSubscription.iterator(); iterator
1435: .hasNext();) {
1436: Lineage lineage = (Lineage) iterator.next();
1437:
1438: if (lineage.getType() == Lineage.OPCON) {
1439: List lineageSchedule = new ArrayList(lineage
1440: .getSchedule());
1441:
1442: if (myLoggingService.isDebugEnabled()) {
1443: myLoggingService.debug(getAgentIdentifier()
1444: + " buildOPCONSchedule() - "
1445: + " found OPCON lineage " + lineage);
1446: }
1447:
1448: for (Iterator scheduleIterator = lineageSchedule
1449: .iterator(); scheduleIterator.hasNext();) {
1450: try {
1451: ScheduleElement element = (ScheduleElement) scheduleIterator
1452: .next();
1453: opconSchedule.add(new LineageTimeSpan(lineage,
1454: element.getStartTime(), element
1455: .getEndTime()));
1456: } catch (IllegalArgumentException iae) {
1457: // Handle as a transient error - we're looking at lineages while the LineagePlugin
1458: // is modifying. Next publish change of a Lineage will put us back in this code -
1459: // hopefully with consistent lineages.
1460: if (myLoggingService.isWarnEnabled()) {
1461: myLoggingService
1462: .warn(getAgentIdentifier()
1463: + " Overlapping OPCON lineages - "
1464: + lineageSchedule
1465: + " - will retry the next time the lineage subscription changes.");
1466: }
1467:
1468: // Return empty schedule since I have no way of knowing what's correct.
1469: opconSchedule = new NonOverlappingTimeSpanSet();
1470: break;
1471: }
1472: }
1473: }
1474: }
1475:
1476: if (myLoggingService.isDebugEnabled()) {
1477: myLoggingService.debug(getAgentIdentifier()
1478: + " buildOPCONSchedule() " + " current schedule - "
1479: + opconSchedule);
1480: }
1481:
1482: return opconSchedule;
1483: }
1484:
1485: protected boolean matchingRequest(MMQueryRequest request,
1486: Role role, TimeSpan timeSpan, ServiceInfoScorer scorer) {
1487: if (!(request.getQuery() instanceof MMRoleQuery)) {
1488: return false;
1489: }
1490:
1491: MMRoleQuery query = (MMRoleQuery) request.getQuery();
1492:
1493: if (query.getObsolete()) {
1494: return false;
1495: }
1496:
1497: MMRoleQuery newQuery = new MMRoleQuery(role, scorer, timeSpan);
1498:
1499: if (request.getResult() == null) {
1500: return (query.equals(newQuery));
1501: } else {
1502: return false;
1503: }
1504: }
1505:
1506: private void handlePersistEarly(boolean justFoundProviders) {
1507: // If the agent just finished finding providers, force a persistence now
1508: // that way, if the agent dies anytime after this, it will come
1509: // back with its providers intact
1510: if (PERSIST_EARLY && justFoundProviders) {
1511: try {
1512: // Bug 3282: Persistence doesn't put in the correct reasons for
1513: // blocking in SchedulableStatus
1514: getBlackboardService().persistNow();
1515:
1516: // Now send a Cougaar event indicating the agent has its providers
1517: // and has persisted them.
1518: if (myEventService != null
1519: && myEventService.isEventEnabled()) {
1520: myEventService.event(getAgentIdentifier()
1521: + " persisted after finding providers.");
1522: } else if (myLoggingService.isInfoEnabled()) {
1523: myLoggingService
1524: .info(getAgentIdentifier()
1525: + " (no event service): persisted after finding providers.");
1526: }
1527: } catch (PersistenceNotEnabledException nope) {
1528: if (myEventService != null
1529: && myEventService.isEventEnabled()) {
1530: myEventService
1531: .event(getAgentIdentifier()
1532: + " finished finding providers (persistence not enabled).");
1533: } else if (myLoggingService.isInfoEnabled()) {
1534: myLoggingService
1535: .info(getAgentIdentifier()
1536: + " (no event service): finished finding providers (persistence not enabled).");
1537: }
1538: } // try/catch block
1539: } // if we just found our providers
1540: }
1541:
1542: public void verifyOutstandingRequests(
1543: TimeSpanSet currentOPCONSchedule) {
1544: for (Iterator requestIterator = myMMRequestSubscription
1545: .iterator(); requestIterator.hasNext();) {
1546: MMQueryRequest request = (MMQueryRequest) requestIterator
1547: .next();
1548: MMRoleQuery query = (MMRoleQuery) request.getQuery();
1549: boolean obsolete = false;
1550:
1551: if (myLoggingService.isDebugEnabled()) {
1552: myLoggingService.debug(getAgentIdentifier()
1553: + ": verifyOutstandingRequests() "
1554: + " iterating over MMQueryRequests, request = "
1555: + request.getUID() + " query = " + query);
1556: }
1557:
1558: if (query.getObsolete()) {
1559: if (myLoggingService.isDebugEnabled()) {
1560: myLoggingService.debug(getAgentIdentifier()
1561: + ": verifyOutstandingRequests() "
1562: + " request = " + request.getUID()
1563: + " already marked as obsolete");
1564: }
1565: continue;
1566: }
1567:
1568: TimeSpan requestTimeSpan = query.getTimeSpan();
1569: Lineage requestLineage = getCommandLineage(requestTimeSpan);
1570:
1571: if (requestLineage == null) {
1572: if (myLoggingService.isDebugEnabled()) {
1573: myLoggingService.debug(getAgentIdentifier()
1574: + ": verifyOutstandingRequests() "
1575: + " request = " + request.getUID()
1576: + " no longer matches the command lineage."
1577: + "Marking as obsolete");
1578: }
1579: obsolete = true;
1580: } else {
1581: TimeSpanSet currentMatches = new TimeSpanSet(
1582: currentOPCONSchedule
1583: .intersectingSet(requestTimeSpan));
1584:
1585: if ((currentMatches.isEmpty())
1586: || (((TimeSpan) currentMatches.first())
1587: .getStartTime() > requestTimeSpan
1588: .getStartTime())
1589: || (((TimeSpan) currentMatches.last())
1590: .getEndTime() < requestTimeSpan
1591: .getEndTime())) {
1592: if (myLoggingService.isDebugEnabled()) {
1593: myLoggingService
1594: .debug(getAgentIdentifier()
1595: + ": verifyOutstandingRequest() "
1596: + " iterating over MMQueryRequest, "
1597: + " request = "
1598: + request.getUID()
1599: + " for "
1600: + new Date(requestTimeSpan
1601: .getStartTime())
1602: + " - "
1603: + new Date(requestTimeSpan
1604: .getEndTime())
1605: + " no longer has an OPCON. Marking as obsolete");
1606: }
1607: obsolete = true;
1608: } else {
1609: long currentLatest = TimeSpan.MAX_VALUE;
1610:
1611: // check for lineage change
1612: for (Iterator currentIterator = currentMatches
1613: .iterator(); currentIterator.hasNext();) {
1614: LineageTimeSpan currentTimeSpan = (LineageTimeSpan) currentIterator
1615: .next();
1616:
1617: // Check that OPCONs are continguous
1618: if (currentTimeSpan.getStartTime() > currentLatest) {
1619: //opcon gap
1620: if (myLoggingService.isDebugEnabled()) {
1621: myLoggingService
1622: .debug(getAgentIdentifier()
1623: + ": verifyOutstandingRequests() "
1624: + " OPCON gap starting at "
1625: + new Date(
1626: currentLatest)
1627: + " - marking MMQueryRequest = "
1628: + request.getUID()
1629: + " as obsolete.");
1630: }
1631:
1632: obsolete = true;
1633: break;
1634: } else {
1635: currentLatest = currentTimeSpan
1636: .getEndTime();
1637: }
1638:
1639: if (!requestLineage.getList().equals(
1640: currentTimeSpan.getLineage().getList())) {
1641: if (myLoggingService.isDebugEnabled()) {
1642: myLoggingService
1643: .debug(getAgentIdentifier()
1644: + ": verifyOutstandingRequest() "
1645: + " marking MMQueryRequest = "
1646: + request.getUID()
1647: + " as obsolete."
1648: + "Request lineage = "
1649: + requestLineage
1650: .getList()
1651: + " does not equal current lineage = "
1652: + currentTimeSpan
1653: .getLineage()
1654: .getList());
1655: }
1656:
1657: obsolete = true;
1658: break;
1659: }
1660: }
1661: }
1662: }
1663:
1664: if (obsolete) {
1665: if (myLoggingService.isDebugEnabled()) {
1666: myLoggingService.debug(getAgentIdentifier()
1667: + ": verifyOutstandingRequest() "
1668: + " publish change MMQueryRequest = "
1669: + request.getUID());
1670: }
1671: query.setObsolete(true);
1672: publishChange(request);
1673: }
1674: }
1675: }
1676:
1677: public void verifyServiceContracts(TimeSpanSet currentOPCONSchedule) {
1678: for (Iterator relayIterator = myServiceContractRelaySubscription
1679: .iterator(); relayIterator.hasNext();) {
1680: ServiceContractRelay relay = (ServiceContractRelay) relayIterator
1681: .next();
1682:
1683: if (myLoggingService.isDebugEnabled()) {
1684: myLoggingService
1685: .debug(getAgentIdentifier()
1686: + ": verifyServiceContracts() "
1687: + " iterating over service contract relays, relay = "
1688: + relay);
1689: }
1690:
1691: ServiceContract contract = relay.getServiceContract();
1692: if ((contract != null) && (!contract.isRevoked())
1693: && (relay.getClient().equals(getSelfOrg()))) {
1694:
1695: // Look at request for lineage because that shows what we asked for.
1696: ServiceRequest request = relay.getServiceRequest();
1697: TimeSpan requestTimeSpan = SDFactory
1698: .getTimeSpanFromPreferences(request
1699: .getServicePreferences());
1700:
1701: Lineage requestLineage = getCommandLineage(requestTimeSpan);
1702:
1703: if (requestLineage == null) {
1704: if (myLoggingService.isDebugEnabled()) {
1705: myLoggingService
1706: .debug(getAgentIdentifier()
1707: + ": verifyServiceContracts() "
1708: + " unable to find original lineage for request = "
1709: + request);
1710: }
1711: publishRemove(relay);
1712: continue;
1713: }
1714:
1715: TimeSpanSet currentMatches = new TimeSpanSet(
1716: currentOPCONSchedule
1717: .intersectingSet(requestTimeSpan));
1718:
1719: if ((currentMatches.isEmpty())
1720: || (((TimeSpan) currentMatches.first())
1721: .getStartTime() > requestTimeSpan
1722: .getStartTime())) {
1723: if (myLoggingService.isDebugEnabled()) {
1724: myLoggingService
1725: .debug(getAgentIdentifier()
1726: + ": verifyServiceContracts() "
1727: + " iterating over service contract relays, removing relay = "
1728: + relay + " requestTimeSpan = "
1729: + requestTimeSpan
1730: + " no longer has an OPCON.");
1731: }
1732: publishRemove(relay);
1733: continue;
1734: }
1735:
1736: Collection timeSpanPreferences = null;
1737: TimeSpan newRequestTimeSpan = null;
1738:
1739: long currentLatest = TimeSpan.MAX_VALUE;
1740:
1741: // check for lineage change
1742: for (Iterator currentIterator = currentMatches
1743: .iterator(); currentIterator.hasNext();) {
1744: LineageTimeSpan currentTimeSpan = (LineageTimeSpan) currentIterator
1745: .next();
1746:
1747: // Check that OPCONs are continguous
1748: if (currentTimeSpan.getStartTime() > currentLatest) {
1749: //opcon gap
1750: if (myLoggingService.isDebugEnabled()) {
1751: myLoggingService.debug(getAgentIdentifier()
1752: + ": verifyServiceContracts() "
1753: + " OPCON gap starting at "
1754: + new Date(currentLatest)
1755: + " - resetting contract end to "
1756: + new Date(currentLatest));
1757: }
1758:
1759: //change pref/, end time = currentTimeSpan.getStartTime();
1760: newRequestTimeSpan = TimeSpans.getSpan(
1761: requestTimeSpan.getStartTime(),
1762: currentLatest);
1763: timeSpanPreferences = mySDFactory
1764: .createTimeSpanPreferences(newRequestTimeSpan);
1765: break;
1766: } else {
1767: currentLatest = currentTimeSpan.getEndTime();
1768: }
1769:
1770: if (!requestLineage.getList().equals(
1771: currentTimeSpan.getLineage().getList())) {
1772: // adjust contract if possible
1773: if (currentTimeSpan.getStartTime() > requestTimeSpan
1774: .getStartTime()) {
1775: if (myLoggingService.isDebugEnabled()) {
1776: myLoggingService
1777: .debug(getAgentIdentifier()
1778: + ": verifyServiceContracts() "
1779: + " resetting contract end to "
1780: + currentTimeSpan
1781: .getStartTime());
1782: }
1783:
1784: //change pref/, end time = currentTimeSpan.getStartTime();
1785: newRequestTimeSpan = TimeSpans.getSpan(
1786: requestTimeSpan.getStartTime(),
1787: currentTimeSpan.getStartTime());
1788:
1789: timeSpanPreferences = mySDFactory
1790: .createTimeSpanPreferences(newRequestTimeSpan);
1791: break;
1792: } else {
1793: if (myLoggingService.isDebugEnabled()) {
1794: myLoggingService
1795: .debug(getAgentIdentifier()
1796: + ": verifyServiceContracts() "
1797: + " iterating over service contract relays, removing relay = "
1798: + relay
1799: + " requestLineage - "
1800: + requestLineage
1801: .getList()
1802: + " - != currentLineage - "
1803: + currentTimeSpan
1804: .getLineage()
1805: .getList());
1806: }
1807: publishRemove(relay);
1808: }
1809: break;
1810: }
1811: }
1812:
1813: if (timeSpanPreferences != null) {
1814: HashMap requestPreferences = copyPreferences(request
1815: .getServicePreferences());
1816:
1817: for (Iterator timeSpanIterator = timeSpanPreferences
1818: .iterator(); timeSpanIterator.hasNext();) {
1819:
1820: Preference timeSpanPreference = (Preference) timeSpanIterator
1821: .next();
1822: requestPreferences.put(new Integer(
1823: timeSpanPreference.getAspectType()),
1824: timeSpanPreference);
1825: }
1826:
1827: if (myLoggingService.isDebugEnabled()) {
1828: TimeSpan timeSpan = SDFactory
1829: .getTimeSpanFromPreferences(requestPreferences
1830: .values());
1831: myLoggingService
1832: .debug(getAgentIdentifier()
1833: + ": changing time span on service request - "
1834: + relay
1835: + " to "
1836: + new Date(timeSpan
1837: .getStartTime())
1838: + " - "
1839: + new Date(timeSpan
1840: .getEndTime()));
1841: }
1842:
1843: ((ServiceRequestImpl) request)
1844: .setServicePreferences(requestPreferences
1845: .values());
1846: publishChange(relay);
1847: }
1848: }
1849: }
1850: }
1851:
1852: private HashMap copyPreferences(Collection preferences) {
1853: HashMap preferenceMap = new HashMap(preferences.size());
1854:
1855: for (Iterator iterator = preferences.iterator(); iterator
1856: .hasNext();) {
1857: Preference original = (Preference) iterator.next();
1858:
1859: Preference copy = getFactory()
1860: .newPreference(original.getAspectType(),
1861: original.getScoringFunction(),
1862: original.getWeight());
1863: preferenceMap.put(new Integer(copy.getAspectType()), copy);
1864: }
1865:
1866: return preferenceMap;
1867: }
1868: }
|