001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.servicediscovery.plugin;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.HashMap;
032: import java.util.Iterator;
033: import java.util.Map;
034: import java.util.Set;
035:
036: import org.cougaar.core.service.community.Community;
037: import org.cougaar.core.service.community.CommunityChangeEvent;
038: import org.cougaar.core.service.community.CommunityChangeListener;
039: import org.cougaar.core.service.community.CommunityResponse;
040: import org.cougaar.core.service.community.CommunityResponseListener;
041: import org.cougaar.core.service.community.CommunityService;
042: import org.cougaar.servicediscovery.description.AvailabilityChangeMessage;
043: import org.cougaar.servicediscovery.description.Lineage;
044: import org.cougaar.servicediscovery.description.ProviderDescription;
045: import org.cougaar.servicediscovery.description.ServiceClassification;
046: import org.cougaar.servicediscovery.description.ServiceClassificationImpl;
047: import org.cougaar.servicediscovery.service.RegistrationService;
048: import org.cougaar.servicediscovery.util.UDDIConstants;
049:
050: /**
051: * Read local agent OWL profile file. Use the listed roles and register this agent with those
052: * roles in the YP.
053: **/
054: public class SDCommunityBasedRegistrationPlugin extends
055: SDRegistrationPluginBase {
056: private CommunityService communityService = null;
057:
058: private HashMap scaHash = null;
059:
060: public void setCommunityService(CommunityService cs) {
061: this .communityService = cs;
062: }
063:
064: public void suspend() {
065: super .suspend();
066:
067: // Remove all community change notifications
068: Set scaSet = scaHash.entrySet();
069:
070: if (log.isInfoEnabled()) {
071: log.info(getAgentIdentifier()
072: + " removing community change listeners.");
073: }
074:
075: for (Iterator iterator = scaSet.iterator(); iterator.hasNext();) {
076: Map.Entry entry = (Map.Entry) iterator.next();
077:
078: ((SCAInfo) entry.getValue()).clearCommunity();
079: }
080: }
081:
082: protected void setupSubscriptions() {
083: super .setupSubscriptions();
084:
085: scaHash = new HashMap();
086:
087: if (rehydrated) {
088: // Currently losing all pre-existing status change messages.
089: // rebuild scaHash & reregister
090: for (Iterator iterator = supportLineageSubscription
091: .iterator(); iterator.hasNext();) {
092: Lineage sca = (Lineage) iterator.next();
093: handleNewSCA(sca);
094: }
095: }
096: }
097:
098: protected void execute() {
099: super .execute();
100:
101: if (isProvider()) {
102: Set scaSet = scaHash.entrySet();
103:
104: for (Iterator iterator = scaSet.iterator(); iterator
105: .hasNext();) {
106: Map.Entry entry = (Map.Entry) iterator.next();
107:
108: SCAInfo scaInfo = (SCAInfo) entry.getValue();
109:
110: if (scaInfo.readyToRegister()) {
111: if (log.isDebugEnabled()) {
112: log.debug("Registering: "
113: + getAgentIdentifier() + " with "
114: + scaInfo.getCommunity());
115: }
116: initialRegister(scaInfo);
117: }
118: }
119:
120: if (supportLineageSubscription.hasChanged()) {
121: Collection adds = supportLineageSubscription
122: .getAddedCollection();
123:
124: if (adds.size() > 0) {
125: for (Iterator iterator = adds.iterator(); iterator
126: .hasNext();) {
127: Lineage sca = (Lineage) iterator.next();
128: handleNewSCA(sca);
129: }
130:
131: // Adds sca to all preexisting registrations.
132: //updateRegistration(adds);
133: }
134: }
135: }
136:
137: // No harm since publish change only occurs if the conf rating has changed.
138: updateRegisterTaskDispositions();
139:
140: // This PI is capable of exiting the execute method while still having
141: // work to do -- hence it must tell the QuiescenceReportService
142: // that it has outstanding work
143: updateQuiescenceService();
144: }
145:
146: private void initialRegister(final SCAInfo scaInfo) {
147: if (isProvider()) {
148:
149: if (!scaInfo.readyToRegister()) {
150: if (log.isDebugEnabled()) {
151: log.debug(getAgentIdentifier()
152: + "Exiting initialRegister early - "
153: + " SCAInfo not ready - " + " community "
154: + scaInfo.getCommunity() + " isRegistered "
155: + scaInfo.getIsRegistered()
156: + " pendingRegistration "
157: + scaInfo.getPendingRegistration()
158: + " isDeleted " + scaInfo.getIsDeleted());
159: }
160:
161: return;
162: }
163:
164: try {
165: final ProviderDescription pd = getPD();
166:
167: if (pd == null) {
168: scaInfo.setIsRegistered(false);
169: scaInfo.setPendingRegistration(false); // okay to try again
170:
171: retryErrorLog("Problem getting ProviderDescription."
172: + " Unable to add registration to "
173: + scaInfo.getCommunity().getName()
174: + ", try again later.");
175:
176: return;
177: }
178:
179: scaInfo.setPendingRegistration(true);
180:
181: RegistrationService.Callback cb = new RegistrationService.Callback() {
182: public void invoke(Object o) {
183: boolean success = ((Boolean) o).booleanValue();
184: if (log.isInfoEnabled()) {
185: log.info(pd.getProviderName()
186: + " initialRegister success = "
187: + success + " with "
188: + scaInfo.getCommunity().getName());
189: }
190:
191: scaInfo.setIsRegistered(true);
192: scaInfo.setPendingRegistration(false);
193: scaInfo.clearCommunity();
194:
195: retryAlarm = null;
196:
197: org.cougaar.core.service.BlackboardService bbs = getBlackboardService();
198: if (bbs != null) {
199: bbs.signalClientActivity();
200: }
201: }
202:
203: public void handle(Exception e) {
204: scaInfo.setPendingRegistration(false); // okay to try again
205: scaInfo.setIsRegistered(false);
206:
207: retryErrorLog(
208: "Problem adding ProviderDescription to "
209: + scaInfo.getCommunity()
210: .getName()
211: + ", try again later: "
212: + getAgentIdentifier(), e);
213: }
214: };
215:
216: // actually submit the request.
217: registrationService
218: .addProviderDescription(
219: scaInfo.getCommunity(),
220: pd,
221: scaServiceClassifications(supportLineageSubscription),
222: cb);
223: } catch (RuntimeException e) {
224: scaInfo.setIsRegistered(false);
225: scaInfo.setPendingRegistration(false); // okay to try again
226:
227: retryErrorLog("Problem adding ProviderDescription to "
228: + scaInfo.getCommunity().getName()
229: + ", try again later: " + getAgentIdentifier(),
230: e);
231: }
232: }
233: }
234:
235: private void updateRegistration(Collection scas) {
236: final Collection adds = scas;
237:
238: Set scaSet = scaHash.entrySet();
239:
240: for (Iterator iterator = scaSet.iterator(); iterator.hasNext();) {
241: Map.Entry entry = (Map.Entry) iterator.next();
242:
243: final SCAInfo scaInfo = (SCAInfo) entry.getValue();
244:
245: if (scaInfo.getIsRegistered()) {
246: if (log.isDebugEnabled()) {
247: log.debug("Added SCAs - " + scas + " - to "
248: + getAgentIdentifier() + " with "
249: + scaInfo.getCommunity());
250: }
251: RegistrationService.Callback cb = new RegistrationService.Callback() {
252: public void invoke(Object o) {
253: boolean success = ((Boolean) o).booleanValue();
254: if (!success) {
255: log.error(getAgentIdentifier()
256: + " unable to update registry in "
257: + scaInfo.getCommunity()
258: + " with new SCAs - " + adds);
259: } else {
260: if (log.isInfoEnabled()) {
261: log.info(getAgentIdentifier()
262: + " updated registry in "
263: + scaInfo.getCommunity()
264: + " with new SCAs - " + adds);
265: }
266: }
267: }
268:
269: public void handle(Exception e) {
270: retryErrorLog(getAgentIdentifier()
271: + " unable to update registry in "
272: + scaInfo.getCommunity()
273: + " with new SCAs - " + adds
274: + ", try again later", e);
275: }
276: };
277:
278: registrationService.updateServiceDescription(scaInfo
279: .getCommunity(), getAgentIdentifier()
280: .toString(), scaServiceClassifications(adds),
281: cb);
282: }
283: }
284: }
285:
286: protected boolean registrationComplete() {
287: if (!isProvider()) {
288: return true;
289: }
290:
291: // Have all the known SCAs reported in?
292: if (scaHash.size() < knownSCAs) {
293: if (log.isDebugEnabled()) {
294: log.debug(getAgentIdentifier()
295: + " registrationComplete(): scaHash.size() = "
296: + scaHash.size() + " knownSCAs = " + knownSCAs);
297: }
298: return false;
299: }
300:
301: Set scaSet = scaHash.entrySet();
302:
303: for (Iterator iterator = scaSet.iterator(); iterator.hasNext();) {
304: Map.Entry entry = (Map.Entry) iterator.next();
305:
306: SCAInfo scaInfo = (SCAInfo) entry.getValue();
307:
308: if (!scaInfo.getIsRegistered()) {
309: if (log.isDebugEnabled()) {
310: log.debug(getAgentIdentifier()
311: + " registrationComplete(): "
312: + scaInfo.getCommunity()
313: + " is not registered.");
314: }
315: return false;
316: }
317: }
318:
319: return true;
320: }
321:
322: private void handleNewSCA(Lineage sca) {
323: if (log.isDebugEnabled()) {
324: log.debug(getAgentIdentifier() + " adding sca "
325: + sca.getRoot());
326: }
327: SCAInfo scaInfo = (SCAInfo) scaHash.get(sca.getRoot());
328:
329: if (scaInfo == null) {
330: scaInfo = new SCAInfo(null, false, false, false);
331: scaHash.put(sca.getRoot(), scaInfo);
332: }
333:
334: Community scaCommunity = communityService.getCommunity(
335: getYPCommunityName(sca),
336: new SCACommunityResponseListener(scaInfo));
337:
338: if (scaCommunity != null) {
339: scaInfo.setCommunity(scaCommunity);
340: if (log.isDebugEnabled()) {
341: log.debug("Registering: " + getAgentIdentifier()
342: + " with " + scaInfo.getCommunity());
343: }
344: initialRegister(scaInfo);
345: }
346: }
347:
348: protected void addRegisteredRole(
349: final AvailabilityChangeMessage availabilityChange) {
350: Set scaSet = scaHash.entrySet();
351:
352: for (Iterator scaIterator = scaSet.iterator(); scaIterator
353: .hasNext();) {
354: Map.Entry entry = (Map.Entry) scaIterator.next();
355:
356: final SCAInfo scaInfo = (SCAInfo) entry.getValue();
357:
358: if (scaInfo.getIsRegistered()
359: || scaInfo.getPendingRegistration()) {
360: // Add the role
361: Collection serviceClassifications = new ArrayList(1);
362: ServiceClassification sca = new ServiceClassificationImpl(
363: availabilityChange.getRole().toString(),
364: availabilityChange.getRole().toString(),
365: UDDIConstants.MILITARY_SERVICE_SCHEME);
366: serviceClassifications.add(sca);
367:
368: RegistrationService.Callback cb = new RegistrationService.Callback() {
369: public void invoke(Object o) {
370: if (log.isDebugEnabled()) {
371: log
372: .debug(getAgentIdentifier()
373: + " added yp registration for role "
374: + availabilityChange
375: .getRole()
376: + "with "
377: + scaInfo.getCommunity());
378: }
379: {
380: org.cougaar.core.service.BlackboardService bbs = getBlackboardService();
381: if (bbs != null)
382: bbs.signalClientActivity();
383: }
384: }
385:
386: public void handle(Exception e) {
387: synchronized (availabilityChange) {
388: availabilityChange
389: .setStatus(AvailabilityChangeMessage.ERROR);
390: }
391: retryErrorLog("addRegisteredRole()", e);
392: }
393: };
394: registrationService.updateServiceDescription(scaInfo
395: .getCommunity(), getAgentIdentifier()
396: .toString(), serviceClassifications, cb);
397: } else {
398: if (log.isInfoEnabled()) {
399: log
400: .info(getAgentIdentifier()
401: + " unable to add yp registration for role "
402: + availabilityChange.getRole()
403: + "with " + scaInfo.getCommunity());
404: }
405: }
406: }
407: }
408:
409: protected void removeRegisteredRole(
410: final AvailabilityChangeMessage availabilityChange) {
411: Set scaSet = scaHash.entrySet();
412:
413: for (Iterator scaIterator = scaSet.iterator(); scaIterator
414: .hasNext();) {
415: Map.Entry entry = (Map.Entry) scaIterator.next();
416:
417: final SCAInfo scaInfo = (SCAInfo) entry.getValue();
418:
419: if (scaInfo.getIsRegistered()
420: || scaInfo.getPendingRegistration()) {
421: // Delete role
422: Collection serviceClassifications = new ArrayList(1);
423: ServiceClassification sca = new ServiceClassificationImpl(
424: availabilityChange.getRole().toString(),
425: availabilityChange.getRole().toString(),
426: UDDIConstants.MILITARY_SERVICE_SCHEME);
427: serviceClassifications.add(sca);
428:
429: RegistrationService.Callback cb = new RegistrationService.Callback() {
430: public void invoke(Object o) {
431: if (log.isDebugEnabled()) {
432: log
433: .debug(getAgentIdentifier()
434: + " removed yp registration for role "
435: + availabilityChange
436: .getRole()
437: + "with "
438: + scaInfo.getCommunity());
439: }
440: {
441: org.cougaar.core.service.BlackboardService bbs = getBlackboardService();
442: if (bbs != null)
443: bbs.signalClientActivity();
444: }
445: }
446:
447: public void handle(Exception e) {
448: synchronized (availabilityChange) {
449: availabilityChange
450: .setStatus(AvailabilityChangeMessage.ERROR);
451: }
452: retryErrorLog("removeRegisteredRole", e);
453: }
454: };
455:
456: registrationService.deleteServiceDescription(scaInfo
457: .getCommunity(), getAgentIdentifier()
458: .toString(), serviceClassifications, cb);
459: }
460: }
461: }
462:
463: private String getYPCommunityName(Lineage sca) {
464: // For now assume every SCA represented by a YPCommunity called
465: // <sca>-YPCOMMUNITY
466: return sca.getRoot() + "-YPCOMMUNITY";
467: }
468:
469: private class SCAInfo {
470: private Community mySCACommunity;
471: private SCACommunityChangeListener myCommunityListener;
472: private boolean myIsRegistered;
473: private boolean myIsDeleted;
474: private boolean myPendingRegistration;
475:
476: public SCAInfo(Community scaCommunity, boolean isRegistered,
477: boolean pendingRegistration, boolean isDeleted) {
478: mySCACommunity = scaCommunity;
479: myIsRegistered = isRegistered;
480: myPendingRegistration = pendingRegistration;
481: myIsDeleted = isDeleted;
482: }
483:
484: public Community getCommunity() {
485: return mySCACommunity;
486: }
487:
488: public void setCommunity(Community scaCommunity) {
489: if (scaCommunity == null) {
490: clearCommunity();
491: } else {
492: if (mySCACommunity == null) {
493: if (log.isDebugEnabled()) {
494: log.debug(getAgentIdentifier()
495: + " adding listener for "
496: + scaCommunity);
497: }
498: mySCACommunity = scaCommunity;
499:
500: // First time so set up change listener
501: myCommunityListener = new SCACommunityChangeListener(
502: this );
503: communityService.addListener(myCommunityListener);
504: } else {
505: mySCACommunity = scaCommunity;
506: }
507: }
508: }
509:
510: public void clearCommunity() {
511: if (log.isDebugEnabled()) {
512: log.debug(getAgentIdentifier()
513: + " removing listener for " + mySCACommunity);
514: }
515: mySCACommunity = null;
516: if (myCommunityListener != null) {
517: communityService.removeListener(myCommunityListener);
518: }
519: }
520:
521: public boolean getIsRegistered() {
522: return myIsRegistered;
523: }
524:
525: public void setIsRegistered(boolean isRegistered) {
526: if ((myIsRegistered) && (!isRegistered)
527: && (log.isDebugEnabled())) {
528: RuntimeException re = new RuntimeException();
529: log
530: .debug(
531: getAgentIdentifier()
532: + " setIsRegistered() going from true to false.",
533: re);
534: }
535: myIsRegistered = isRegistered;
536: }
537:
538: public boolean getPendingRegistration() {
539: return myPendingRegistration;
540: }
541:
542: public void setPendingRegistration(boolean pendingRegistration) {
543: myPendingRegistration = pendingRegistration;
544: }
545:
546: public boolean getIsDeleted() {
547: return myIsDeleted;
548: }
549:
550: public void setIsDeleted(boolean isDeleted) {
551: myIsDeleted = isDeleted;
552: }
553:
554: public boolean readyToRegister() {
555: return ((getCommunity() != null) && (!getIsRegistered())
556: && (!getPendingRegistration()) && (!getIsDeleted()));
557: }
558: }
559:
560: private class SCACommunityResponseListener implements
561: CommunityResponseListener {
562: private SCAInfo scaInfo;
563:
564: public SCACommunityResponseListener(SCAInfo info) {
565: scaInfo = info;
566: }
567:
568: public void getResponse(CommunityResponse resp) {
569: if (log.isDebugEnabled()) {
570: log.debug(getAgentIdentifier()
571: + " got Community info for "
572: + (Community) resp.getContent());
573: }
574:
575: Community scaCommunity = (Community) resp.getContent();
576:
577: scaInfo.setCommunity(scaCommunity);
578: {
579: org.cougaar.core.service.BlackboardService bbs = getBlackboardService();
580: if (bbs != null)
581: bbs.signalClientActivity();
582: }
583: }
584: }
585:
586: private class SCACommunityChangeListener implements
587: CommunityChangeListener {
588: private SCAInfo scaInfo;
589: String communityName;
590:
591: public SCACommunityChangeListener(SCAInfo info) {
592: scaInfo = info;
593: communityName = scaInfo.getCommunity().getName();
594: }
595:
596: public void communityChanged(CommunityChangeEvent event) {
597: Community scaCommunity = event.getCommunity();
598:
599: // Paranoia code - bug in community code seems to lead to
600: // notifications with null communities.
601: if (scaCommunity == null) {
602: if (log.isDebugEnabled()) {
603: log
604: .debug(getAgentIdentifier()
605: + " received Community change info for a null community");
606: }
607: return;
608: }
609:
610: if (log.isDebugEnabled()) {
611: log.debug(getAgentIdentifier()
612: + " got Community change info for "
613: + scaCommunity);
614: }
615:
616: if (scaCommunity == null) {
617: if (log.isDebugEnabled()) {
618: log
619: .debug(getAgentIdentifier()
620: + " received Community change info for a null community");
621: }
622: return;
623: }
624:
625: if (scaCommunity.getName().equals(getCommunityName())) {
626: scaInfo.setCommunity(scaCommunity);
627:
628: if (scaInfo.readyToRegister()) {
629: if (log.isDebugEnabled()) {
630: log.debug(getAgentIdentifier()
631: + " signalClientActivity for "
632: + scaCommunity);
633: }
634:
635: if (getBlackboardService() == null) {
636: log
637: .warn(getAgentIdentifier()
638: + " ignoring change notification "
639: + " - getBlackboardService() returned null");
640: scaInfo.clearCommunity();
641: } else {
642: org.cougaar.core.service.BlackboardService bbs = getBlackboardService();
643: if (bbs != null)
644: bbs.signalClientActivity();
645: }
646: }
647: } else if (log.isDebugEnabled()) {
648: log.debug(getAgentIdentifier()
649: + " ignoring CommunityChangeEvent for "
650: + scaCommunity.getName()
651: + " - listening for - " + getCommunityName());
652: }
653:
654: }
655:
656: public String getCommunityName() {
657: return communityName;
658: }
659: }
660: }
|