001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.servicediscovery.plugin;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031:
032: import org.cougaar.core.service.LoggingService;
033: import org.cougaar.servicediscovery.description.AvailabilityChangeMessage;
034: import org.cougaar.servicediscovery.description.ProviderDescription;
035: import org.cougaar.servicediscovery.description.ServiceClassification;
036: import org.cougaar.servicediscovery.description.ServiceClassificationImpl;
037: import org.cougaar.servicediscovery.service.RegistrationService;
038: import org.cougaar.servicediscovery.util.UDDIConstants;
039:
040: /**
041: * Read local agent OWL profile file. Use the listed roles and register this agent with those
042: * roles in the YP.
043: **/
044: public class SDRegistrationPlugin extends SDRegistrationPluginBase {
045: private static String ypAgent = System.getProperty(
046: "org.cougaar.yp.ypAgent", "OSD.GOV");
047:
048: private int outstandingSCAUpdates = 0;
049:
050: private boolean isRegistered = false;
051:
052: /** true iff we have submitted a registration request which hasn't yet been observed to complete **/
053: private boolean inProgress = false;
054: /** true iff we've got the callback successfully, but need to notify everyone else **/
055: private boolean isPending = false;
056:
057: public void load() {
058: super .load();
059: }
060:
061: public void unload() {
062: super .unload();
063: }
064:
065: protected void setupSubscriptions() {
066: super .setupSubscriptions();
067:
068: // For now re-register every time.
069: isRegistered = false;
070: }
071:
072: protected void execute() {
073: super .execute();
074:
075: if (isProvider()) {
076: boolean registrationInitiated = false;
077:
078: if (isPending && !inProgress) {
079: isRegistered = true;
080: isPending = false;
081:
082: if (log.isInfoEnabled()) {
083: log.info(getAgentIdentifier()
084: + " completed initial registration.");
085: }
086:
087: if (outstandingSCAUpdates != 0) {
088: Collection scas = supportLineageSubscription
089: .getCollection();
090: if (log.isInfoEnabled()) {
091: log.info(getAgentIdentifier()
092: + " post pending added new SCAs - "
093: + scas);
094: }
095:
096: // Set outstanding count to 1 because we're adding all known SCAs
097: outstandingSCAUpdates = 1;
098: updateRegistration(scas);
099: }
100: } else if (!isRegistered) {
101: if (supportLineageSubscription.size() >= knownSCAs) {
102: if (log.isDebugEnabled()) {
103: log.debug(getAgentIdentifier()
104: + " registering with SCAs "
105: + supportLineageSubscription);
106: }
107: registrationInitiated = true;
108: initialRegister();
109: } else {
110: if (log.isDebugEnabled()) {
111: log.debug(getAgentIdentifier()
112: + " waiting to register - " + " need "
113: + knownSCAs + " SCAs, have "
114: + supportLineageSubscription.size());
115: }
116: }
117: }
118:
119: if (supportLineageSubscription.hasChanged()) {
120: Collection adds = supportLineageSubscription
121: .getAddedCollection();
122:
123: if (!adds.isEmpty()) {
124:
125: if ((log.isInfoEnabled())
126: && (supportLineageSubscription.size() > knownSCAs)) {
127: log.info(getAgentIdentifier() + " expected "
128: + knownSCAs + " received "
129: + supportLineageSubscription.size()
130: + " " + supportLineageSubscription);
131: }
132:
133: if (isRegistered) {
134: outstandingSCAUpdates++;
135: if (log.isInfoEnabled()) {
136: log.info(getAgentIdentifier()
137: + " added new SCAs - " + adds);
138: }
139: updateRegistration(adds);
140: } else if (inProgress && !registrationInitiated) {
141: outstandingSCAUpdates++;
142: if (log.isInfoEnabled()) {
143: log
144: .info(getAgentIdentifier()
145: + " skip adding new SCAs - "
146: + adds);
147: }
148: }
149: }
150: }
151: }
152:
153: // No harm since publish change only occurs if the conf rating has changed.
154: updateRegisterTaskDispositions();
155:
156: // This PI is capable of exiting the execute method while still having
157: // work to do -- hence it must tell the QuiescenceReportService
158: // that it has outstanding work
159: updateQuiescenceService();
160: }
161:
162: private void updateRegistration(Collection scas) {
163: final Collection adds = scas;
164:
165: // IMPORTANT - currently no code to retry. Failed transactions == lost SCAs
166: registrationService.updateServiceDescription(ypAgent,
167: getAgentIdentifier().toString(),
168: scaServiceClassifications(adds),
169: new RegistrationService.Callback() {
170: public void invoke(Object o) {
171: // Success or not, we've completed
172: // handling this transaction.
173: outstandingSCAUpdates--;
174: boolean success = ((Boolean) o).booleanValue();
175: if (!success) {
176: log
177: .error(getAgentIdentifier()
178: + " unable to update registry with new SCAs - "
179: + adds);
180: } else {
181:
182: if (log.isInfoEnabled()) {
183: log
184: .info(getAgentIdentifier()
185: + " updated registry with new SCAs - "
186: + adds);
187: }
188: }
189: getBlackboardService().signalClientActivity();
190: }
191:
192: public void handle(Exception e) {
193: // Success or not, we've completed
194: // handling this transaction.
195: outstandingSCAUpdates--;
196: retryErrorLog("updateRegistration", e);
197: }
198: });
199:
200: }
201:
202: private void initialRegister() {
203: if (isPending)
204: return; // skip if we're just waiting to notify - of course, then we shouldn't be here
205:
206: if (isProvider()) {
207: final ProviderDescription pd = getPD();
208:
209: if (pd == null) {
210: // Unable to get provider description
211: retryErrorLog("Problem getting ProviderDescription now, try again later.");
212: return;
213: }
214:
215: if (inProgress) {
216: if (log.isInfoEnabled()) {
217: log.info(getAgentIdentifier()
218: + " still waiting for register");
219: }
220: } else {
221: inProgress = true; // don't allow another one to start
222: outstandingSCAUpdates = 0; // will pick up all known SCAs
223:
224: // callback will be executed by another thread
225: RegistrationService.Callback cb = new RegistrationService.Callback() {
226: public void invoke(Object o) {
227: boolean success = ((Boolean) o).booleanValue();
228: if (log.isInfoEnabled()) {
229: log.info(pd.getProviderName()
230: + " initialRegister success = "
231: + success);
232: }
233:
234: isPending = true; // let the plugin set isRegistered
235: retryAlarm = null; // probably not safe
236: inProgress = false; // ok to update
237: getBlackboardService().signalClientActivity();
238: }
239:
240: public void handle(Exception e) {
241: inProgress = false; // ok to update
242: retryErrorLog("initialRegister", e);
243: }
244: };
245:
246: // actually submit the request.
247: if (log.isInfoEnabled()) {
248: log.info(getAgentIdentifier()
249: + " initial registration with SCAs "
250: + supportLineageSubscription);
251: }
252: registrationService
253: .addProviderDescription(
254: ypAgent,
255: pd,
256: scaServiceClassifications(supportLineageSubscription),
257: cb);
258: }
259: } else {
260: if (log.isDebugEnabled()) {
261: log.debug(getAgentIdentifier()
262: + " not registering, no owl file.");
263: }
264: }
265: }
266:
267: protected boolean registrationComplete() {
268: //return ((outstandingSCAUpdates == 0) && (isRegistered));
269: return (!isProvider()) || (isRegistered);
270: }
271:
272: protected void removeRegisteredRole(
273: final AvailabilityChangeMessage availabilityChange) {
274: Collection serviceClassifications = new ArrayList(1);
275: ServiceClassification sca = new ServiceClassificationImpl(
276: availabilityChange.getRole().toString(),
277: availabilityChange.getRole().toString(),
278: UDDIConstants.MILITARY_SERVICE_SCHEME);
279: serviceClassifications.add(sca);
280:
281: RegistrationService.Callback cb = new RegistrationService.Callback() {
282: public void invoke(Object o) {
283: if (log.isDebugEnabled()) {
284: log.debug(getAgentIdentifier()
285: + " removed yp registration for role "
286: + availabilityChange.getRole());
287: }
288: getBlackboardService().signalClientActivity();
289: }
290:
291: public void handle(Exception e) {
292: synchronized (availabilityChange) {
293: availabilityChange
294: .setStatus(AvailabilityChangeMessage.ERROR);
295: }
296: retryErrorLog("removeRegisteredRole", e);
297: }
298: };
299: registrationService.deleteServiceDescription(ypAgent,
300: getAgentIdentifier().toString(),
301: serviceClassifications, cb);
302: }
303:
304: protected void addRegisteredRole(
305: final AvailabilityChangeMessage availabilityChange) {
306: Collection serviceClassifications = new ArrayList(1);
307: ServiceClassification sca = new ServiceClassificationImpl(
308: availabilityChange.getRole().toString(),
309: availabilityChange.getRole().toString(),
310: UDDIConstants.MILITARY_SERVICE_SCHEME);
311: serviceClassifications.add(sca);
312:
313: RegistrationService.Callback cb = new RegistrationService.Callback() {
314: public void invoke(Object o) {
315: if (log.isDebugEnabled()) {
316: log.debug(getAgentIdentifier()
317: + " added yp registration for role "
318: + availabilityChange.getRole());
319: }
320: getBlackboardService().signalClientActivity();
321: }
322:
323: public void handle(Exception e) {
324: synchronized (availabilityChange) {
325: availabilityChange
326: .setStatus(AvailabilityChangeMessage.ERROR);
327: }
328: retryErrorLog("addRegisteredRole", e);
329: }
330: };
331: registrationService.updateServiceDescription(ypAgent,
332: getAgentIdentifier().toString(),
333: serviceClassifications, cb);
334: }
335: }
|