001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.servicediscovery.plugin;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.Iterator;
033:
034: import org.cougaar.core.agent.service.alarm.Alarm;
035: import org.cougaar.core.blackboard.IncrementalSubscription;
036: import org.cougaar.core.plugin.ComponentPlugin;
037: import org.cougaar.core.service.LoggingService;
038: import org.cougaar.servicediscovery.util.UDDIConstants;
039: import org.cougaar.servicediscovery.description.MMRoleQuery;
040: import org.cougaar.servicediscovery.description.ScoredServiceDescriptionImpl;
041: import org.cougaar.servicediscovery.description.ServiceClassification;
042: import org.cougaar.servicediscovery.description.ServiceClassificationImpl;
043: import org.cougaar.servicediscovery.description.ServiceInfo;
044: import org.cougaar.servicediscovery.service.RegistryQueryService;
045: import org.cougaar.servicediscovery.transaction.MMQueryRequest;
046: import org.cougaar.servicediscovery.transaction.MMQueryRequestImpl;
047: import org.cougaar.servicediscovery.transaction.RegistryQuery;
048: import org.cougaar.servicediscovery.transaction.RegistryQueryImpl;
049: import org.cougaar.util.UnaryPredicate;
050:
051: /**
052: * The Matchmaker is responsible for taking service discovery requests (MMQueryRequests)
053: * from the SDClient, and issuing asynchronous queries to the YP
054: * to find matching providers. When one (or more) is found, send the scored
055: * results back the SDClient on the MMQueryRequest.
056: *<p>
057: * This version assumes the Role requested will be in the CommercialServiceScheme, and allows the YP
058: * to handle walking up YP communities as necessary. It does not handle quiescence,
059: * is not guaranteed to work with kills/restarts (persistence), only works
060: * with a distributed YP (using communities, not a single static instance), etc.
061: *
062: * @property org.cougaar.servicediscovery.plugin.SimpleMatchmakerQueryGracePeriod (in minutes, default is 2) specifies
063: * how long to wait before YP query errors should be logged at ERROR instead of DEBUG.
064: */
065: public class SimpleMatchmakerPlugin extends ComponentPlugin {
066: private static final String QUERY_GRACE_PERIOD_PROPERTY = "org.cougaar.servicediscovery.plugin.SimpleMatchmakerQueryGracePeriod";
067: private static final int DEFAULT_WARNING_SUPPRESSION_INTERVAL = 2; // minutes
068: private static final int WARNING_SUPPRESSION_INTERVAL; // minutes
069:
070: static {
071: WARNING_SUPPRESSION_INTERVAL = Integer.getInteger(
072: QUERY_GRACE_PERIOD_PROPERTY,
073: DEFAULT_WARNING_SUPPRESSION_INTERVAL).intValue();
074: }
075:
076: private long myWarningCutoffTime = 0;
077:
078: protected LoggingService myLoggingService;
079: private RegistryQueryService myRegistryQueryService;
080: private IncrementalSubscription myClientRequestSubscription;
081:
082: /** outstanding RQ are those which have been issued but have not yet returned */
083: private ArrayList myOutstandingRQs = new ArrayList();
084:
085: /** pending RQs are returned RQ which haven't been consumed by the plugin yet */
086: private ArrayList myPendingRQs = new ArrayList();
087:
088: /** Subscribe to MMQueryRequests from the SDClient */
089: private UnaryPredicate myQueryRequestPredicate = new UnaryPredicate() {
090: public boolean execute(Object o) {
091: if (o instanceof MMQueryRequest) {
092: MMQueryRequest qr = (MMQueryRequest) o;
093: return (qr.getQuery() instanceof MMRoleQuery);
094: }
095: return false;
096: }
097: };
098:
099: /** Reflection sets the RegistryQueryService at startup - plugin will not load if not found. */
100: public void setRegistryQueryService(RegistryQueryService rqs) {
101: myRegistryQueryService = rqs;
102: }
103:
104: /**
105: * Over-ride parent load to get non-essential services (the log service here).
106: */
107: public void load() {
108: super .load();
109:
110: myLoggingService = (LoggingService) getServiceBroker()
111: .getService(this , LoggingService.class, null);
112: if (myLoggingService == null) {
113: myLoggingService = LoggingService.NULL;
114: }
115: }
116:
117: /**
118: * Every load() should have an unload(), to unload() the manually loaded service.
119: */
120: public void unload() {
121: if (myRegistryQueryService != null) {
122: getServiceBroker().releaseService(this ,
123: RegistryQueryService.class, myRegistryQueryService);
124: myRegistryQueryService = null;
125: }
126:
127: if ((myLoggingService != null)
128: && (myLoggingService != LoggingService.NULL)) {
129: getServiceBroker().releaseService(this ,
130: LoggingService.class, myLoggingService);
131: myLoggingService = null;
132: }
133:
134: super .unload();
135: }
136:
137: /**
138: * Subscribe to MMQueryRequests from the SDClientPlugin.
139: */
140: protected void setupSubscriptions() {
141: myClientRequestSubscription = (IncrementalSubscription) getBlackboardService()
142: .subscribe(myQueryRequestPredicate);
143: }
144:
145: /**
146: * Loop through new MMQueryRequests, posting a new asynchronous YP query for each. Also
147: * runs when the YP query callbacks signal that the plugin should run. So loop through the pending
148: * requests, posting alarms to retry those with errors later, and sending back the answer
149: * to the SDClient for those that have a list of providers.
150: *<p>
151: * Note that you must specify the service scheme in which Roles will be found in the YP,
152: * through the helper getServiceSchemeForRoles() method.
153: */
154: protected void execute() {
155: // Look at all the new MMQueryRequests from the SDClientPlugin
156: if (myClientRequestSubscription.hasChanged()) {
157: for (Iterator i = myClientRequestSubscription
158: .getAddedCollection().iterator(); i.hasNext();) {
159: MMQueryRequest queryRequest = (MMQueryRequest) i.next();
160: MMRoleQuery query = (MMRoleQuery) queryRequest
161: .getQuery();
162: RegistryQuery rq = new RegistryQueryImpl();
163: RQ r;
164:
165: // Find all service providers for specifed Role (as code and name) under the given service scheme.
166: ServiceClassification roleSC = new ServiceClassificationImpl(
167: query.getRole().toString(), query.getRole()
168: .toString(), getServiceSchemeForRoles());
169: rq.addServiceClassification(roleSC);
170: if (myLoggingService.isDebugEnabled())
171: myLoggingService.debug("RegistryQuery = " + rq
172: + " " + rq.getServiceClassifications());
173: r = new RQ(queryRequest, query, rq);
174:
175: // Put the new query in a queue, issuing an asynchronous request with a callback
176: postRQ(r);
177: }
178: } // end of block for new requests from the SDClientPlugin
179:
180: // Now handle any callbacks that have come in. When they come in, they are put
181: // on the pending Queue, and then the Blackboard is signaled to run this plugin. That is
182: // when we get here.
183: RQ r;
184: while ((r = getPendingRQ()) != null) {
185: if (r.exception == null) {
186: // This means it succeeded -- send the found providers back to the SDClient
187: handleResponse(r);
188: } else {
189: // There was an exception. Log and retry later (by setting an alarm)
190: handleException(r);
191: }
192: } // end of loop over pending Q, which should now be empty
193:
194: // If at this point we have nothing on the post queue (new queries we just sent)
195: // nor outstanding alarms (queries that failed and we're retrying later), then
196: // we are done (until the next SDClient query come in).
197: } // end of execute()
198:
199: /**
200: * Return the UDDI Service Scheme that contains the Roles we will look for.
201: * This is typically the COMMERCIAL_SERVICE_SCHEME or MILITARY_SERVICE_SCHEME.
202: * <p>
203: * This method may be the only one an extender of this plugin needs to over-ride.
204: * @return UDDI Service Scheme to find Roles in
205: */
206: protected String getServiceSchemeForRoles() {
207: return UDDIConstants.COMMERCIAL_SERVICE_SCHEME;
208: }
209:
210: /**
211: * Handle an exception from a YP query callback by logging something, waiting a while, and trying again.
212: * @param r YP query that had error.
213: */
214: protected void handleException(RQ r) {
215: retryErrorLog(r, ": Exception querying YP registry for "
216: + r.query.getRole().toString()
217: + ", will try again later.", r.exception);
218: r.exception = null;
219: }
220:
221: /**
222: * Handle an error with a YP lookup request by logging and retrying later.
223: * @param r YP query that had error.
224: * @param message Pretty message to print to explain the error
225: */
226: private void retryErrorLog(RQ r, String message) {
227: retryErrorLog(r, message, null);
228: }
229:
230: /**
231: * When an error occurs, but we'll be retrying later, treat it as a DEBUG
232: * at first. After a while it becomes an error.
233: * @param r YP query that had error.
234: * @param message Pretty message to print to explain the error
235: * @param e The exception that caused the problem
236: */
237: private void retryErrorLog(RQ r, String message, Throwable e) {
238: // This needs to be random to avoid them all firing at once... FIXME!!!
239: int rand = (int) (Math.random() * 10000) + 1000;
240: QueryAlarm alarm = new QueryAlarm(r, getAlarmService()
241: .currentTimeMillis()
242: + rand);
243: getAlarmService().addAlarm(alarm);
244:
245: if (myLoggingService.isDebugEnabled()) {
246: myLoggingService
247: .debug("retryErrorLog - adding a QueryAlarm for "
248: + r.query.getRole() + ", alarm: " + alarm);
249: }
250:
251: // If we've waited to allow startup errors and now still getting errors,
252: // then log these as errors. Until then, log them as DEBUG.
253: if (System.currentTimeMillis() > getWarningCutoffTime()) {
254: if (e == null)
255: myLoggingService.error(message);
256: else
257: myLoggingService.error(message, e);
258: } else if (myLoggingService.isDebugEnabled()) {
259: if (e == null)
260: myLoggingService.debug(message);
261: else
262: myLoggingService.debug(message, e);
263: }
264: }
265:
266: /**
267: * Process a real response from the YP for a lookup. Score it using the
268: * scoring function on the original query. If it scores well,
269: * then put this service provider on the response back to the SDClient.
270: * If we don't have a good provider to send the SDClient, then check to
271: * see if the YP said there was another higher (parent) YP server to check.
272: * If there is, then post the query to that server. Otherwise, we've failed.
273: */
274: protected void handleResponse(RQ r) {
275: MMQueryRequest queryRequest = r.queryRequest;
276: MMRoleQuery query = r.query;
277:
278: Collection services = r.services;
279:
280: if (myLoggingService.isDebugEnabled()) {
281: myLoggingService
282: .debug("handleResponse - registry query result size is : "
283: + services.size()
284: + " for query: "
285: + query.getRole().toString());
286: }
287:
288: ArrayList scoredServiceDescriptions = new ArrayList();
289: for (Iterator iter = services.iterator(); iter.hasNext();) {
290: ServiceInfo serviceInfo = (ServiceInfo) iter.next();
291:
292: // This is where the custom ServiceInfoScorer gets used. We match the service
293: // provider returned by the YP against our rules, to decide if it will do.
294: // For the pizza app, that is the RoleWithBlacklistScorer.
295: // We insist on a positive score.
296: int score = query.getServiceInfoScorer().scoreServiceInfo(
297: serviceInfo);
298:
299: if (score >= 0) {
300: scoredServiceDescriptions
301: .add(new ScoredServiceDescriptionImpl(score,
302: serviceInfo));
303: if (myLoggingService.isDebugEnabled()) {
304: myLoggingService
305: .debug(": execute: adding Provider name - "
306: + serviceInfo.getProviderName()
307: + " Service name: "
308: + serviceInfo.getServiceName()
309: + " Service score: " + score);
310: }
311: } else {
312: // Negative score means provider didn't pass. We'll keep looking.
313: if (myLoggingService.isDebugEnabled()) {
314: myLoggingService
315: .debug(": execute: ignoring Provider name - "
316: + serviceInfo.getProviderName()
317: + " Service name: "
318: + serviceInfo.getServiceName()
319: + " Service score: " + score);
320: }
321: }
322: } // end of loop over services found in the YP on last look-up
323:
324: // If we didn't find any services that scored well
325: if (scoredServiceDescriptions.isEmpty()) {
326: // Is there another higher-level YP community?
327: if (!r.nextContextFailed) {
328: // We just didn't find any yet! Re-post, so we recurse up to the next higher
329: // "context", or YP Community. (in our example, from Cambridge to MA)
330: if (myLoggingService.isDebugEnabled()) {
331: myLoggingService
332: .debug(":execute - no matching provider for "
333: + query.getRole()
334: + " in "
335: + r.currentYPContext
336: + " retrying in next context.");
337: }
338: // post the query to the next YP server
339: postRQ(r);
340: } else {
341: // Couldn't find another YPServer to search
342: retryErrorLog(r, ": unable to find provider for "
343: + query.getRole()
344: + ", publishing empty query result. "
345: + "Will try query again later.");
346:
347: }
348: } else {
349: // We have some results. Sort them by score, so the client gets the best one.
350: Collections.sort(scoredServiceDescriptions);
351: }
352:
353: // Set our results on the request from the client, and publish change it to send it back
354: ((MMQueryRequestImpl) queryRequest)
355: .setResult(scoredServiceDescriptions);
356: ((MMQueryRequestImpl) queryRequest).setQueryCount(queryRequest
357: .getQueryCount() + 1);
358: getBlackboardService().publishChange(queryRequest);
359:
360: if (myLoggingService.isDebugEnabled()) {
361: myLoggingService.debug(": publishChanged query");
362: }
363: }
364:
365: /**
366: * Get the real time after which DEBUG level problems become ERROR: This is
367: * the parametrized warning cut-off interval (in minutes, default of 2), plus
368: * the time of the first error.
369: * @return time in millis
370: */
371: protected long getWarningCutoffTime() {
372: if (myWarningCutoffTime == 0) {
373: myWarningCutoffTime = System.currentTimeMillis()
374: + WARNING_SUPPRESSION_INTERVAL * 60000;
375: }
376:
377: return myWarningCutoffTime;
378: }
379:
380: /**
381: * Inner class representing status of a query to send to the YP.
382: */
383: private class RQ {
384: MMQueryRequest queryRequest; // object sent from SDClient
385: MMRoleQuery query; // contained in above
386: RegistryQuery rq; // The actual query to send to the YP
387:
388: Collection services; // services found in YP
389: Exception exception; // exception, if any, from last YP query
390: boolean complete = false;
391: Object previousYPContext = null; // last YP server searched
392: Object currentYPContext = null; // next YP server to search
393: boolean nextContextFailed = false; // is there a next YP server/
394:
395: RQ(MMQueryRequest queryRequest, MMRoleQuery query,
396: RegistryQuery rq) {
397: this .queryRequest = queryRequest;
398: this .query = query;
399: this .rq = rq;
400: }
401: }
402:
403: /** Issue an asynchronous request to the YP, noting the outstanding query. */
404: private void postRQ(final RQ r) {
405: if (myLoggingService.isDebugEnabled()) {
406: myLoggingService.debug(": postRQ " + r + " (" + r.rq + ")");
407: }
408:
409: // The list of outstanding requests gets touched by the callback (YP) thread
410: // and the plugin thread, so synchronize to avoid concurrent-mod exceptions.
411: synchronized (myOutstandingRQs) {
412: myOutstandingRQs.add(r);
413: }
414:
415: // MatchmakerStubPlugin has another alternative -- where there is a single
416: // fixed YP server
417: findServiceWithDistributedYP(r);
418: }
419:
420: /** Note an asynchronous response from the YP, and wake the plugin to handle it in the plugin thread. */
421: private void pendRQ(RQ r) {
422: if (myLoggingService.isDebugEnabled()) {
423: myLoggingService.debug(": pendRQ " + r + " (" + r.rq + ")");
424: }
425:
426: // The current request finished (for good or ill)
427: r.complete = true;
428:
429: // Again, since these get touched in 2 threads, syncrhonize
430: synchronized (myOutstandingRQs) {
431: myOutstandingRQs.remove(r);
432: }
433: synchronized (myPendingRQs) {
434: myPendingRQs.add(r);
435: }
436: // tell the plugin to wake up, so the execute() method gets called
437: getBlackboardService().signalClientActivity();
438: }
439:
440: /** Pop a pending RQ of the list (or null) so that we can deal with it. */
441: private RQ getPendingRQ() {
442: RQ r = null;
443: synchronized (myPendingRQs) {
444: if (!myPendingRQs.isEmpty()) {
445: r = (RQ) myPendingRQs.remove(0); // treat like a fifo
446: if (myLoggingService.isDebugEnabled()) {
447: myLoggingService.debug(": getPendingRQ " + r + " ("
448: + r.rq + ")");
449: }
450: }
451: }
452: return r;
453: }
454:
455: /**
456: * This is the workhourse: ask the YP to find a service that matches the given request,
457: * and to tell us when it is done with a callback we supply. It asks the YP to
458: * walk up the hierarchy of YP communities to find ever broader YP servers if that is necessary.
459: */
460: protected void findServiceWithDistributedYP(final RQ r) {
461: if (myLoggingService.isDebugEnabled()) {
462: myLoggingService.debug(": findServiceWithDistributedYP - "
463: + " using YPCommunity search.");
464: }
465:
466: // This is the main method. It issues the query to the YP server,
467: // through the RegistryQueryService, which is provided by the
468: // o.c.sd.service.UDDI4JRegistrationQueryServiceComponent.
469: // Here, we tell the YP to start looking in the currentYPContext (YP server),
470: // satisfying the given query, and to let us know using the given callback.
471: myRegistryQueryService.findServiceAndBinding(
472: r.currentYPContext, r.rq,
473: new RegistryQueryService.CallbackWithContext() {
474: public void invoke(Object result) {
475: // Take the given services results for our query
476: r.services = (Collection) result;
477: if (myLoggingService.isDebugEnabled()) {
478: myLoggingService.debug(": results = "
479: + result + " for "
480: + r.currentYPContext);
481: }
482: // And update the queues
483: flush();
484: }
485:
486: public void handle(Exception e) {
487: // Got some sort of error trying to do the YP lookup
488: r.exception = e;
489: if (myLoggingService.isDebugEnabled()) {
490: myLoggingService.debug(
491: ": failed during query of "
492: + r.queryRequest
493: + " context = "
494: + r.currentYPContext, e);
495: }
496: // And update the queues
497: flush();
498: }
499:
500: public void setNextContext(Object context) {
501: // If one YP community / context doesn't have a matching provider,
502: // we move up to the next one. IE, when Cambridge only
503: // contains a blacklisted provider (Joes), we move up to MA.
504: if (myLoggingService.isDebugEnabled())
505: myLoggingService
506: .debug(": previous YPContext "
507: + r.currentYPContext
508: + " current YPContext "
509: + context);
510:
511: r.previousYPContext = r.currentYPContext;
512: r.currentYPContext = context;
513:
514: // If there is no next context, then we're out of YP servers!
515: if (context == null) {
516: r.nextContextFailed = true;
517: }
518: }
519:
520: private void flush() {
521: // Put the request on the pending list for handling the next time the plugin runs
522: pendRQ(r);
523: }
524: });
525: } // end of findServiceWithDistributedYP
526:
527: /** Alarm to post a query to the YP after some time. */
528: public class QueryAlarm implements Alarm {
529: private long expiresAt; // when the alarm will fire
530: private boolean expired = false; // has the alarm fired
531: private RQ rq = null;
532:
533: public QueryAlarm(RQ rq, long expirationTime) {
534: expiresAt = expirationTime;
535: this .rq = rq;
536: }
537:
538: public long getExpirationTime() {
539: return expiresAt;
540: }
541:
542: /** When the alarm fires, post the query to the YP. */
543: public synchronized void expire() {
544: if (!expired) {
545: expired = true;
546: rq.complete = false; // mark the query as in-progress
547: postRQ(rq);
548: }
549: }
550:
551: public boolean hasExpired() {
552: return expired;
553: }
554:
555: public synchronized boolean cancel() {
556: boolean was = expired;
557: expired = true;
558: return was;
559: }
560:
561: public String toString() {
562: return "<QueryAlarm " + expiresAt
563: + (expired ? "(Expired) " : " ")
564: + rq.query.getRole() + " "
565: + "for MatchmakerPlugin at " + getAgentIdentifier()
566: + ">";
567: }
568: } // end of QueryAlarm definition
569: }
|