001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.servicediscovery.plugin;
028:
029: import java.io.File;
030: import java.util.*;
031: import java.text.DateFormat;
032: import java.text.SimpleDateFormat;
033: import java.text.ParseException;
034:
035: import org.cougaar.core.agent.service.alarm.Alarm;
036: import org.cougaar.core.blackboard.IncrementalSubscription;
037: import org.cougaar.core.plugin.ComponentPlugin;
038: import org.cougaar.core.service.AgentIdentificationService;
039: import org.cougaar.core.service.DomainService;
040: import org.cougaar.core.service.LoggingService;
041: import org.cougaar.core.service.QuiescenceReportService;
042: import org.cougaar.planning.ldm.PlanningFactory;
043: import org.cougaar.planning.ldm.plan.AllocationResult;
044: import org.cougaar.planning.ldm.plan.Disposition;
045: import org.cougaar.planning.ldm.plan.PlanElement;
046: import org.cougaar.planning.ldm.plan.Role;
047: import org.cougaar.planning.ldm.plan.Schedule;
048: import org.cougaar.planning.ldm.plan.ScheduleElement;
049: import org.cougaar.planning.ldm.plan.ScheduleElementImpl;
050: import org.cougaar.planning.ldm.plan.Task;
051: import org.cougaar.planning.plugin.util.PluginHelper;
052: import org.cougaar.servicediscovery.SDDomain;
053: import org.cougaar.servicediscovery.SDFactory;
054: import org.cougaar.servicediscovery.Constants;
055: import org.cougaar.servicediscovery.description.AvailabilityChangeMessage;
056: import org.cougaar.servicediscovery.description.Lineage;
057: import org.cougaar.servicediscovery.description.ProviderCapabilities;
058: import org.cougaar.servicediscovery.description.ProviderCapability;
059: import org.cougaar.servicediscovery.description.ProviderDescription;
060: import org.cougaar.servicediscovery.description.ProviderDescriptionImpl;
061: import org.cougaar.servicediscovery.description.ServiceCategory;
062: import org.cougaar.servicediscovery.description.ServiceClassification;
063: import org.cougaar.servicediscovery.description.ServiceClassificationImpl;
064: import org.cougaar.servicediscovery.description.ServiceProfile;
065: import org.cougaar.servicediscovery.service.RegistrationService;
066: import org.cougaar.servicediscovery.util.UDDIConstants;
067: import org.cougaar.util.TimeSpan;
068: import org.cougaar.util.UnaryPredicate;
069:
070: /**
071: * Read local agent OWL profile file. Use the listed roles and register this agent with those
072: * roles in the YP.
073: **/
074: public abstract class SDRegistrationPluginBase extends ComponentPlugin {
075:
076: protected static final long DEFAULT_START = TimeSpan.MIN_VALUE;
077: protected static final long DEFAULT_END = TimeSpan.MAX_VALUE;
078:
079: protected static final String OWL_IDENTIFIER = ".profile.owl";
080:
081: protected LoggingService log;
082: protected RegistrationService registrationService = null;
083: protected DomainService domainService = null;
084: protected QuiescenceReportService quiescenceReportService = null;
085:
086: protected IncrementalSubscription supportLineageSubscription;
087: protected IncrementalSubscription availabilityChangeSubscription;
088: protected IncrementalSubscription registerTaskSubscription;
089:
090: protected static int WARNING_SUPPRESSION_INTERVAL = 5;
091: protected long warningCutoffTime = 0;
092: protected static final String REGISTRATION_GRACE_PERIOD_PROPERTY = "org.cougaar.servicediscovery.plugin.RegistrationGracePeriod";
093: private static final long DATE_ERROR = Long.MIN_VALUE;
094: private static final long HOUR_IN_MILLIS = 3600000;
095: private static final long DAY_IN_MILLIS = 86400000;
096:
097: protected Alarm retryAlarm;
098: protected int knownSCAs = 0;
099: long initialTime = parseInitialTime(); //should be 8/10/05 00:05:00
100: long parsedAvailStart = DEFAULT_START;
101: long parsedAvailEnd = DEFAULT_END;
102:
103: protected boolean rehydrated;
104:
105: protected ProviderDescription provD = null;
106: protected boolean publishProviderCapabilities;
107:
108: private UnaryPredicate supportLineagePredicate = new UnaryPredicate() {
109: public boolean execute(Object o) {
110: return ((o instanceof Lineage) && (((Lineage) o).getType() == Lineage.SUPPORT));
111: }
112: };
113:
114: private UnaryPredicate availabilityChangePredicate = new UnaryPredicate() {
115: public boolean execute(Object o) {
116: return (o instanceof AvailabilityChangeMessage);
117: }
118: };
119:
120: private UnaryPredicate registerTaskPredicate = new UnaryPredicate() {
121: public boolean execute(Object o) {
122: return ((o instanceof Task) && (((Task) o).getVerb()
123: .equals(Constants.Verbs.RegisterServices)));
124: }
125: };
126:
127: private UnaryPredicate providerCapabilitiesPredicate = new UnaryPredicate() {
128: public boolean execute(Object o) {
129: if (o instanceof ProviderCapabilities) {
130: ProviderCapabilities providerCapabilities = (ProviderCapabilities) o;
131: return (providerCapabilities.getProviderName()
132: .equals(getAgentIdentifier().toString()));
133: } else {
134: return false;
135: }
136: }
137: };
138:
139: public void setDomainService(DomainService ds) {
140: domainService = ds;
141: }
142:
143: public void setLoggingService(LoggingService log) {
144: this .log = log;
145: }
146:
147: public void setQuiescenceReportService(QuiescenceReportService qrs) {
148: quiescenceReportService = qrs;
149:
150: }
151:
152: public void setRegistrationService(RegistrationService rs) {
153: registrationService = rs;
154: }
155:
156: public AgentIdentificationService getAgentIdentificationService() {
157: // Service established down in BlackboardClientComponent
158: return agentIdentificationService;
159: }
160:
161: public void suspend() {
162: if (log.isInfoEnabled()) {
163: log.info(getAgentIdentifier() + " suspend.");
164: }
165: super .suspend();
166:
167: if (retryAlarm != null) {
168: if (log.isInfoEnabled()) {
169: log.info(getAgentIdentifier()
170: + " cancelling retryAlarm.");
171: }
172: retryAlarm.cancel();
173: }
174: }
175:
176: public void load() {
177: super .load();
178:
179: if (getAgentIdentificationService() != null) {
180: quiescenceReportService
181: .setAgentIdentificationService(getAgentIdentificationService());
182: }
183: }
184:
185: public void unload() {
186: if (registrationService != null) {
187: getBindingSite().getServiceBroker().releaseService(this ,
188: RegistrationService.class, registrationService);
189: registrationService = null;
190: }
191:
192: /* Quiescence reporting support should we decide we need it */
193: if (quiescenceReportService != null) {
194: getBindingSite().getServiceBroker().releaseService(this ,
195: QuiescenceReportService.class,
196: quiescenceReportService);
197: quiescenceReportService = null;
198: }
199:
200: if ((log != null) && (log != LoggingService.NULL)) {
201: getBindingSite().getServiceBroker().releaseService(this ,
202: LoggingService.class, log);
203: log = null;
204: }
205: super .unload();
206: }
207:
208: protected void setupSubscriptions() {
209: supportLineageSubscription = (IncrementalSubscription) getBlackboardService()
210: .subscribe(supportLineagePredicate);
211: availabilityChangeSubscription = (IncrementalSubscription) getBlackboardService()
212: .subscribe(availabilityChangePredicate);
213: registerTaskSubscription = (IncrementalSubscription) getBlackboardService()
214: .subscribe(registerTaskPredicate);
215:
216: Collection params = getParameters();
217: // Optional parameters to this plugin are the number of known sca's - i.e. an int
218: // and the start and end time representing the availability of a provider
219: // i.e. AvailabilityStart:0 and AvailabilityEnd:14
220: // where the value is an offset from C0 (i.e. 8/15/00 00:00:00) in even hour increments.
221: // Currently does NOT handle non even hour offsets.
222:
223: if (params.size() > 0) {
224: Iterator it = params.iterator();
225: while (it.hasNext()) {
226: String nextParam = (String) it.next();
227: //String numStr = (String) params.iterator().next();
228: if (!nextParam.startsWith("Availability")) {
229: try {
230: knownSCAs = Integer.parseInt(nextParam);
231: } catch (NumberFormatException nfe) {
232: knownSCAs = 0;
233: log.error(getAgentIdentifier()
234: + " invalid SCA count parameter - "
235: + nextParam, nfe);
236: }
237: } else {
238: int tokenIndex = nextParam.indexOf(":");
239: String availabilityParam = nextParam
240: .substring(tokenIndex + 1);
241: long parsedAvailability = Long
242: .parseLong(availabilityParam);
243: if (nextParam.startsWith("AvailabilityStart")) {
244: parsedAvailStart = parsedAvailability;
245: } else {
246: parsedAvailEnd = parsedAvailability;
247: }
248: }
249: }
250: } else {
251: knownSCAs = 0;
252: }
253:
254: rehydrated = getBlackboardService().didRehydrate();
255:
256: if (rehydrated) {
257: Collection pcCollection = getBlackboardService().query(
258: providerCapabilitiesPredicate);
259:
260: publishProviderCapabilities = (pcCollection.isEmpty());
261: } else {
262: publishProviderCapabilities = true;
263: }
264: }
265:
266: protected void execute() {
267: if (isProvider()) {
268:
269: if (publishProviderCapabilities) {
270: ProviderCapabilities providerCapabilities = createProviderCapabilities();
271:
272: if (providerCapabilities != null) {
273: getBlackboardService().publishAdd(
274: providerCapabilities);
275: publishProviderCapabilities = false;
276: } else {
277: retryErrorLog("Problem getting ProviderDescription, try again later.");
278: }
279: }
280:
281: if (availabilityChangeSubscription.hasChanged()) {
282: Collection adds = availabilityChangeSubscription
283: .getAddedCollection();
284: handleAvailabilityChange(adds);
285: }
286: }
287: }
288:
289: /* ProviderDescription is big - release resources if we don't need it
290: * anymore.
291: */
292: protected void clearPD() {
293: if ((provD != null) && (log.isDebugEnabled())) {
294: log.debug(getAgentIdentifier() + ": clearPD()");
295: }
296:
297: provD = null;
298: }
299:
300: /* Returns null if unable to parse the provider description */
301: protected ProviderDescription getPD() {
302: if (provD == null) {
303: if (log.isDebugEnabled()) {
304: log.debug(getAgentIdentifier()
305: + ": getPD() parsing OWL.");
306: }
307:
308: ProviderDescription pd = new ProviderDescriptionImpl();
309: try {
310: boolean ok = pd.parseOWL(getAgentIdentifier()
311: + OWL_IDENTIFIER);
312:
313: if (ok && (pd.getProviderName() != null)) {
314: if (log.isDebugEnabled()) {
315: log.debug(getAgentIdentifier()
316: + ": getPD() successfully parsed OWL.");
317: }
318:
319: provD = pd;
320: } else {
321: if (log.isDebugEnabled()) {
322: log.debug(getAgentIdentifier()
323: + ": getPD() unable to parse OWL."
324: + " ok = " + ok);
325: }
326: }
327: } catch (java.util.ConcurrentModificationException cme) {
328: // Jena can do a concurrent mod exception. See bug 3052
329: // Leave provD uninitialized
330: if (log.isDebugEnabled()) {
331: log
332: .debug(getAgentIdentifier()
333: + ": getPD() ConcurrentModificationException - "
334: + cme);
335: }
336: }
337: }
338: return provD;
339: }
340:
341: protected long getWarningCutOffTime() {
342: if (warningCutoffTime == 0) {
343: WARNING_SUPPRESSION_INTERVAL = Integer.getInteger(
344: REGISTRATION_GRACE_PERIOD_PROPERTY,
345: WARNING_SUPPRESSION_INTERVAL).intValue();
346: warningCutoffTime = System.currentTimeMillis()
347: + WARNING_SUPPRESSION_INTERVAL * 60000;
348: }
349:
350: return warningCutoffTime;
351: }
352:
353: protected void resetWarningCutoffTime() {
354: warningCutoffTime = -1;
355: }
356:
357: protected void retryErrorLog(String message) {
358: retryErrorLog(message, null);
359: }
360:
361: // When an error occurs, but we'll be retrying later, treat it as a DEBUG
362: // at first. After a while it becomes an error.
363: protected void retryErrorLog(String message, Throwable e) {
364:
365: long absTime = getAlarmService().currentTimeMillis()
366: + (int) (Math.random() * 10000) + 1000;
367:
368: retryAlarm = new RetryAlarm(absTime);
369: getAlarmService().addAlarm(retryAlarm);
370:
371: if (System.currentTimeMillis() > getWarningCutOffTime()) {
372: if (e == null)
373: log.error(getAgentIdentifier() + message);
374: else
375: log.error(getAgentIdentifier() + message, e);
376: } else if (log.isDebugEnabled()) {
377: if (e == null)
378: log.debug(getAgentIdentifier() + message);
379: else
380: log.debug(getAgentIdentifier() + message, e);
381: }
382: }
383:
384: protected Collection scaServiceClassifications(
385: Collection supportLineageCollection) {
386: Collection serviceClassifications = new ArrayList(
387: supportLineageCollection.size());
388: for (Iterator iterator = supportLineageCollection.iterator(); iterator
389: .hasNext();) {
390: Lineage lineage = (Lineage) iterator.next();
391: ServiceClassification sca = new ServiceClassificationImpl(
392: lineage.getRoot(), lineage.getRoot(),
393: UDDIConstants.SUPPORT_COMMAND_ASSIGNMENT);
394: serviceClassifications.add(sca);
395: }
396: return serviceClassifications;
397: }
398:
399: protected abstract boolean registrationComplete();
400:
401: /* Returns initial version of ProviderCapabilities created from the
402: * provider OWL file.
403: */
404: protected ProviderCapabilities createProviderCapabilities() {
405: ProviderDescription pd = getPD();
406:
407: if (pd == null) {
408: return null;
409: }
410:
411: long localC0 = initialTime + (5 * DAY_IN_MILLIS) - 300000; //take off 5 minutes to get to midnight
412:
413: long availabilityStart = localC0
414: + (parsedAvailStart * HOUR_IN_MILLIS);
415: long availabilityEnd = localC0
416: + (parsedAvailEnd * HOUR_IN_MILLIS);
417:
418: Collection serviceProfiles = pd.getServiceProfiles();
419:
420: PlanningFactory planningFactory = (PlanningFactory) domainService
421: .getFactory("planning");
422: SDFactory sdFactory = (SDFactory) domainService
423: .getFactory(SDDomain.SD_NAME);
424: ProviderCapabilities providerCapabilities = sdFactory
425: .newProviderCapabilities(getAgentIdentifier()
426: .toString());
427:
428: for (Iterator iterator = serviceProfiles.iterator(); iterator
429: .hasNext();) {
430: ServiceProfile serviceProfile = (ServiceProfile) iterator
431: .next();
432:
433: Collection serviceCategories = serviceProfile
434: .getServiceCategories();
435:
436: Role role = null;
437: String echelon = null;
438:
439: for (Iterator scIterator = serviceCategories.iterator(); scIterator
440: .hasNext();) {
441: ServiceCategory serviceCategory = (ServiceCategory) scIterator
442: .next();
443:
444: String scheme = serviceCategory.getCategorySchemeName();
445: if (scheme
446: .equals(UDDIConstants.MILITARY_ECHELON_SCHEME)) {
447: echelon = serviceCategory.getCategoryName();
448: } else if (scheme
449: .equals(UDDIConstants.MILITARY_SERVICE_SCHEME)) {
450: role = Role.getRole(serviceCategory
451: .getCategoryName());
452: }
453:
454: if ((role != null) && (echelon != null)) {
455: Schedule defaultSchedule = null;
456: if (parsedAvailStart == DEFAULT_START) {
457: defaultSchedule = planningFactory
458: .newSimpleSchedule(DEFAULT_START,
459: DEFAULT_END);
460: } else {
461: if (log.isInfoEnabled()) {
462: log.info("availabilityStart is "
463: + new Date(availabilityStart)
464: + " and availabilityEnd is "
465: + new Date(availabilityEnd)
466: + "for agent "
467: + getAgentIdentifier());
468: }
469: defaultSchedule = planningFactory
470: .newSimpleSchedule(availabilityStart,
471: availabilityEnd);
472: }
473:
474: providerCapabilities.addCapability(role, echelon,
475: defaultSchedule);
476: break;
477: }
478: }
479: }
480:
481: return providerCapabilities;
482: }
483:
484: private boolean reregistrationKludgeNeeded(Task task) {
485: PlanElement pe = task.getPlanElement();
486: // BOZO - special rehydration kludge for quiesence monitor.
487: // Plugin will reregister because it doesn't know whether the previous
488: // registration exists but we don't want to perturb quiescence.
489: return ((rehydrated) && (pe != null) && (pe
490: .getEstimatedResult().getConfidenceRating() == 1.0));
491: }
492:
493: protected void updateQuiescenceService() {
494: if (quiescenceReportService != null) {
495: if (registrationComplete()) {
496: // Tell the Q Service I'm quiescent
497: quiescenceReportService.setQuiescentState();
498: if (log.isInfoEnabled()) {
499: log
500: .info(getAgentIdentifier()
501: + " done with SDRegistration. Now quiescent.");
502: }
503: } else if (registerTaskSubscription.isEmpty()) {
504: // no point in checking for the reregistration kludge
505: quiescenceReportService.clearQuiescentState();
506: if (log.isInfoEnabled()) {
507: log
508: .info(getAgentIdentifier()
509: + " waiting to complete registration - not quiescent.");
510: }
511: return;
512: } else {
513: for (Iterator iterator = registerTaskSubscription
514: .iterator(); iterator.hasNext();) {
515: Task task = (Task) iterator.next();
516:
517: if (!reregistrationKludgeNeeded(task)) {
518: // May be waiting on a callback or a community or an SCA. Say not Q
519: quiescenceReportService.clearQuiescentState();
520: if (log.isInfoEnabled()) {
521: log
522: .info(getAgentIdentifier()
523: + " waiting to complete registration - not quiescent.");
524: }
525: return;
526: }
527: }
528:
529: // Getting here means that all the registerTasks require the
530: // reregistation kludge - special 'fix' for quiesence
531: // monitor. Plugin will reregister on rehydration because it doesn't
532: // know whether the previous registration still exists but we don't
533: // want to perturb quiescence state.
534: quiescenceReportService.setQuiescentState();
535: if (log.isInfoEnabled()) {
536: log
537: .info(getAgentIdentifier()
538: + ": updateQuiescenceService() "
539: + " setting quiescent state even though reregistration "
540: + " after rehydration is not complete\n"
541: + " rehydrated = " + rehydrated
542: + " register tasks = "
543: + registerTaskSubscription);
544: }
545: }
546: }
547: }
548:
549: protected void updateRegisterTaskDispositions() {
550: PlanningFactory planningFactory = (PlanningFactory) domainService
551: .getFactory("planning");
552:
553: for (Iterator iterator = registerTaskSubscription.iterator(); iterator
554: .hasNext();) {
555: Task task = (Task) iterator.next();
556: PlanElement pe = task.getPlanElement();
557: double conf;
558: if (!registrationComplete()) {
559: // BOZO - special rehydration kludge for quiesence monitor.
560: // Plugin will reregister because it doesn't know whether the previous
561: // registration exists but does not downgrade the confidence as the
562: // change would propagate via GLSExpander back to NCA.
563: if (reregistrationKludgeNeeded(task)) {
564: if (log.isWarnEnabled()) {
565: log
566: .warn(getAgentIdentifier()
567: + ": updateRegisterTaskDisposition() "
568: + "leaving confidence at 1.0 after rehydration even though "
569: + "reregistration is not complete.");
570: }
571: conf = 1.0;
572: } else {
573: if ((pe != null)
574: && (pe.getEstimatedResult()
575: .getConfidenceRating() == 1.0)) {
576: if (log.isDebugEnabled()) {
577: log
578: .debug(getAgentIdentifier()
579: + ": updateRegisterTaskDisposition() "
580: + "changing confidence back to 0.0."
581: + " rehydrated == "
582: + rehydrated);
583: }
584: }
585: conf = 0.0;
586: }
587: } else {
588: // ProviderDescription is big - release resources since we're
589: // done with registration
590: clearPD();
591:
592: conf = 1.0;
593: }
594:
595: AllocationResult estResult = PluginHelper
596: .createEstimatedAllocationResult(task,
597: planningFactory, conf, true);
598:
599: if (pe == null) {
600: if (log.isInfoEnabled()) {
601: log
602: .info(getAgentIdentifier()
603: + " adding a disposition to RegisterServices task, confidence rating is "
604: + conf);
605: }
606: Disposition disposition = planningFactory
607: .createDisposition(task.getPlan(), task,
608: estResult);
609: getBlackboardService().publishAdd(disposition);
610: } else if (pe.getEstimatedResult().getConfidenceRating() != conf) {
611: double previousConf = pe.getEstimatedResult()
612: .getConfidenceRating();
613:
614: // If we're backing up from a Confidence of 1.0 to lower, like at rehydration
615: // when we don't know whether the YP lost our registration,
616: // be a little more verbose.
617: if (previousConf == 1.0) {
618: log
619: .warn(getAgentIdentifier()
620: + " SDRegistrationPlugin is "
621: + " changing RegisterServices confidence rating from "
622: + previousConf + " to " + conf
623: + ". Rehydrated?");
624: } else if (log.isInfoEnabled()) {
625: log
626: .info(getAgentIdentifier()
627: + " changing RegisterServices confidence rating from "
628: + previousConf + " to " + conf);
629: }
630: pe.setEstimatedResult(estResult);
631: getBlackboardService().publishChange(pe);
632: }
633:
634: if (conf == 1.0) {
635: resetWarningCutoffTime();
636: }
637: }
638: }
639:
640: private long parseInitialTime() {
641: String propertyName = "org.cougaar.initTime";
642: long date = DATE_ERROR;
643: long time = DATE_ERROR;
644: String value = System.getProperty(propertyName);
645: if (value != null) {
646: try {
647: DateFormat f = (new SimpleDateFormat(
648: "MM/dd/yyy H:mm:ss"));
649: f.setTimeZone(TimeZone.getTimeZone("GMT"));
650: time = f.parse(value).getTime();
651: // get midnight of specified date
652: Calendar c = f.getCalendar();
653: c.setTimeInMillis(time);
654: c.set(Calendar.HOUR, 0);
655: c.set(Calendar.MINUTE, 0);
656: c.set(Calendar.SECOND, 0);
657: c.set(Calendar.MILLISECOND, 0);
658: date = c.getTimeInMillis();
659: } catch (ParseException e) {
660: // try with just the date
661: try {
662: DateFormat f = (new SimpleDateFormat("MM/dd/yyy"));
663: f.setTimeZone(TimeZone.getTimeZone("GMT"));
664: time = f.parse(value).getTime();
665: } catch (ParseException e1) {
666: if (log.isDebugEnabled())
667: log.debug("Failed to parse property "
668: + propertyName
669: + " as date+time or just time: "
670: + value, e1);
671: }
672: }
673: }
674:
675: return time;
676: }
677:
678: // Handle a change to our registration status
679: protected void handleAvailabilityChange(
680: AvailabilityChangeMessage availabilityChange) {
681: synchronized (availabilityChange) {
682: if (!isProvider()) {
683: if (log.isDebugEnabled()) {
684: log.debug(getAgentIdentifier()
685: + " not a provider. "
686: + "Ignoring AvailabiltyChangeMessage - "
687: + availabilityChange);
688: }
689: return;
690: }
691:
692: if (log.isDebugEnabled()) {
693: log
694: .debug(getAgentIdentifier()
695: + " handling AvailabiltyChangeMessage. Status = "
696: + availabilityChange.getStatus());
697: }
698:
699: switch (availabilityChange.getStatus()) {
700: case AvailabilityChangeMessage.REQUESTED:
701: updateProviderCapability(availabilityChange);
702: availabilityChange
703: .setStatus(AvailabilityChangeMessage.COMPLETED);
704: getBlackboardService()
705: .publishChange(availabilityChange);
706: break;
707: case AvailabilityChangeMessage.PENDING:
708: // let it go. might want to check to see if it takes a very long time...
709: break;
710: case AvailabilityChangeMessage.COMPLETED:
711: if (!availabilityChange.isRegistryUpdated()) {
712: availabilityChange.setRegistryUpdated(true);
713: getBlackboardService().publishChange(
714: availabilityChange);
715: }
716: break;
717: case AvailabilityChangeMessage.DONE:
718: // should drop it from the list;
719: break;
720: case AvailabilityChangeMessage.ERROR:
721: // retry, perhaps?
722: break;
723: }
724: }
725: }
726:
727: protected void handleAvailabilityChange(
728: Collection availabilityChanges) {
729: for (Iterator iterator = availabilityChanges.iterator(); iterator
730: .hasNext();) {
731: AvailabilityChangeMessage availabilityChange = (AvailabilityChangeMessage) iterator
732: .next();
733: handleAvailabilityChange(availabilityChange);
734: }
735:
736: return;
737: }
738:
739: protected void updateProviderCapability(
740: AvailabilityChangeMessage availabilityChange) {
741: Collection pcCollection = getBlackboardService().query(
742: providerCapabilitiesPredicate);
743:
744: if (log.isDebugEnabled()) {
745: log.debug(getAgentIdentifier()
746: + ": updateProviderCapability handling "
747: + availabilityChange);
748: }
749:
750: for (Iterator iterator = pcCollection.iterator(); iterator
751: .hasNext();) {
752: ProviderCapabilities capabilities = (ProviderCapabilities) iterator
753: .next();
754:
755: ProviderCapability capability = capabilities
756: .getCapability(availabilityChange.getRole());
757:
758: if (capability != null) {
759: TimeSpan timeSpan = availabilityChange.getTimeSpan();
760: Schedule currentAvailability = capability
761: .getAvailableSchedule();
762:
763: if (log.isDebugEnabled()) {
764: log
765: .debug(getAgentIdentifier()
766: + ": found matching ProviderCapability handling "
767: + capability
768: + " with current availability "
769: + currentAvailability);
770: }
771:
772: PlanningFactory planningFactory = (PlanningFactory) domainService
773: .getFactory("planning");
774: Schedule newAvailability = planningFactory
775: .newSchedule(currentAvailability
776: .getAllScheduleElements());
777:
778: Collection overlaps = currentAvailability
779: .getOverlappingScheduleElements(timeSpan
780: .getStartTime(), timeSpan.getEndTime());
781:
782: if (log.isDebugEnabled()) {
783: log.debug(getAgentIdentifier()
784: + ": overlaps found - " + overlaps);
785: }
786:
787: boolean change = false;
788: if (overlaps.size() == 0) {
789: if (availabilityChange.isAvailable()) {
790: change = true;
791: // Construct ScheduleElement to fill the entire period.
792: ScheduleElementImpl newScheduleElement = new ScheduleElementImpl(
793: timeSpan.getStartTime(), timeSpan
794: .getEndTime());
795: newAvailability.add(newScheduleElement);
796: }
797: } else {
798: change = true;
799:
800: ScheduleElement earliest = null;
801: ScheduleElement latest = null;
802:
803: for (Iterator overlap = overlaps.iterator(); overlap
804: .hasNext();) {
805: latest = (ScheduleElement) overlap.next();
806: if (earliest == null) {
807: earliest = latest;
808: }
809: newAvailability.remove(latest);
810: }
811:
812: if (availabilityChange.isAvailable()) {
813: // Construct ScheduleElement to fill the entire period.
814: long newStart = Math.min(earliest
815: .getStartTime(), timeSpan
816: .getStartTime());
817: long newEnd = Math.max(latest.getEndTime(),
818: timeSpan.getEndTime());
819: ScheduleElementImpl newScheduleElement = new ScheduleElementImpl(
820: newStart, newEnd);
821: newAvailability.add(newScheduleElement);
822: } else {
823: // Construct ScheduleElements to bracket the unavailable time
824:
825: if (earliest.getStartTime() < timeSpan
826: .getStartTime()) {
827: ScheduleElementImpl newEarliest = new ScheduleElementImpl(
828: earliest.getStartTime(), timeSpan
829: .getStartTime());
830: newAvailability.add(newEarliest);
831: }
832:
833: if (latest.getEndTime() > timeSpan.getEndTime()) {
834: ScheduleElementImpl newLatest = new ScheduleElementImpl(
835: timeSpan.getEndTime(), latest
836: .getEndTime());
837: newAvailability.add(newLatest);
838: }
839: }
840: }
841:
842: if (change) {
843: if (log.isDebugEnabled()) {
844: log
845: .debug(getAgentIdentifier()
846: + " provider availability after change "
847: + newAvailability);
848: }
849:
850: capability.setAvailableSchedule(newAvailability);
851: getBlackboardService().publishChange(capabilities);
852:
853: if (newAvailability.size() == 0) {
854: // Provider never available so remove registration
855: removeRegisteredRole(availabilityChange);
856: } else if (currentAvailability.size() == 0) {
857: // Provider changing from never available so add registration
858: addRegisteredRole(availabilityChange);
859: }
860:
861: } else {
862: if (log.isDebugEnabled()) {
863: log
864: .debug(getAgentIdentifier()
865: + " ignoring AvailabilityChangeMessage "
866: + availabilityChange
867: + " information already included in ProviderCapability "
868: + capability);
869: }
870: }
871: }
872: }
873: }
874:
875: protected abstract void addRegisteredRole(
876: final AvailabilityChangeMessage availabilityChange);
877:
878: protected abstract void removeRegisteredRole(
879: final AvailabilityChangeMessage availabilityChange);
880:
881: // Is this Agent a service provider?
882: protected boolean isProvider() {
883: return getProviderFile().exists();
884: }
885:
886: // Get the OWL service provider file
887: protected File getProviderFile() {
888: String owlFileName = getAgentIdentifier().toString()
889: + OWL_IDENTIFIER;
890: return new File(org.cougaar.servicediscovery.Constants
891: .getServiceProfileURL().getFile()
892: + owlFileName);
893: }
894:
895: public class RetryAlarm implements Alarm {
896: private long expiresAt;
897: private boolean expired = false;
898:
899: public RetryAlarm(long expirationTime) {
900: expiresAt = expirationTime;
901: }
902:
903: public long getExpirationTime() {
904: return expiresAt;
905: }
906:
907: public synchronized void expire() {
908: if (!expired) {
909: expired = true;
910: getBlackboardService().signalClientActivity();
911: }
912: }
913:
914: public boolean hasExpired() {
915: return expired;
916: }
917:
918: public synchronized boolean cancel() {
919: boolean was = expired;
920: expired = true;
921: return was;
922: }
923:
924: public String toString() {
925: return "<RetryAlarm " + expiresAt
926: + (expired ? "(Expired) " : " ")
927: + "for SDCommunityBasedRegistrationPlugin at "
928: + getAgentIdentifier() + ">";
929: }
930: }
931: }
|