001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.servicediscovery.plugin;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.Date;
033: import java.util.Iterator;
034: import java.util.List;
035:
036: import org.cougaar.core.agent.service.alarm.Alarm;
037: import org.cougaar.core.blackboard.IncrementalSubscription;
038: import org.cougaar.core.service.AgentIdentificationService;
039: import org.cougaar.core.service.LoggingService;
040: import org.cougaar.core.service.QuiescenceReportService;
041: import org.cougaar.planning.ldm.plan.Role;
042: import org.cougaar.planning.plugin.legacy.SimplePlugin;
043: import org.cougaar.servicediscovery.Constants;
044: import org.cougaar.servicediscovery.description.Lineage;
045: import org.cougaar.servicediscovery.description.MMRoleQuery;
046: import org.cougaar.servicediscovery.description.ScoredServiceDescriptionImpl;
047: import org.cougaar.servicediscovery.description.ServiceClassification;
048: import org.cougaar.servicediscovery.description.ServiceClassificationImpl;
049: import org.cougaar.servicediscovery.description.ServiceInfo;
050: import org.cougaar.servicediscovery.service.RegistryQueryService;
051: import org.cougaar.servicediscovery.transaction.MMQueryRequest;
052: import org.cougaar.servicediscovery.transaction.MMQueryRequestImpl;
053: import org.cougaar.servicediscovery.transaction.RegistryQuery;
054: import org.cougaar.servicediscovery.transaction.RegistryQueryImpl;
055: import org.cougaar.servicediscovery.util.UDDIConstants;
056: import org.cougaar.util.TimeSpan;
057: import org.cougaar.util.UnaryPredicate;
058:
059: /**
060: *
061: * Query the YellowPages for possible service providers
062: *
063: */
064: public class MatchmakerStubPlugin extends SimplePlugin {
065: private static int WARNING_SUPPRESSION_INTERVAL = 2;
066: private long myWarningCutoffTime = 0;
067: private static final String QUERY_GRACE_PERIOD_PROPERTY = "org.cougaar.servicediscovery.plugin.QueryGracePeriod";
068:
069: private boolean myDistributedYPServers;
070: private String myAgentName;
071: private LoggingService myLoggingService;
072: private RegistryQueryService myRegistryQueryService;
073: private QuiescenceReportService myQuiescenceReportService;
074: private AgentIdentificationService myAgentIdentificationService;
075: private IncrementalSubscription myClientRequestSubscription;
076: private IncrementalSubscription myLineageSubscription;
077:
078: // pending RQs are returned RQ which haven't been consumed by the plugin yet
079: private ArrayList myPendingRQs = new ArrayList();
080:
081: // Outstanding RQ are those which have been issued but have not yet returned
082: // used to support quiescence.
083: private ArrayList myOutstandingRQs = new ArrayList();
084:
085: // Outstanding alarms (any means non-quiescent)
086: // used to support quiescence.
087: private ArrayList myOutstandingAlarms = new ArrayList();
088:
089: private UnaryPredicate myLineagePredicate = new UnaryPredicate() {
090: public boolean execute(Object o) {
091: return (o instanceof Lineage);
092: }
093: };
094:
095: private UnaryPredicate myQueryRequestPredicate = new UnaryPredicate() {
096: public boolean execute(Object o) {
097: if (o instanceof MMQueryRequest) {
098: MMQueryRequest qr = (MMQueryRequest) o;
099: return (qr.getQuery() instanceof MMRoleQuery);
100: }
101: return false;
102: }
103: };
104:
105: public void load() {
106: super .load();
107:
108: myLoggingService = (LoggingService) getBindingSite()
109: .getServiceBroker().getService(this ,
110: LoggingService.class, null);
111: if (myLoggingService == null) {
112: myLoggingService = LoggingService.NULL;
113: }
114:
115: myRegistryQueryService = (RegistryQueryService) getBindingSite()
116: .getServiceBroker().getService(this ,
117: RegistryQueryService.class, null);
118: myAgentIdentificationService = (AgentIdentificationService) getBindingSite()
119: .getServiceBroker().getService(this ,
120: AgentIdentificationService.class, null);
121:
122: // Set up the QuiescenceReportService so that while waiting for the YP and
123: // alarms we don't go quiescent by mistake
124: myQuiescenceReportService = (QuiescenceReportService) getBindingSite()
125: .getServiceBroker().getService(this ,
126: QuiescenceReportService.class, null);
127:
128: if (myQuiescenceReportService != null)
129: myQuiescenceReportService
130: .setAgentIdentificationService(myAgentIdentificationService);
131:
132: if (myRegistryQueryService == null)
133: throw new RuntimeException(
134: "Unable to obtain RegistryQuery service");
135: }
136:
137: public void unload() {
138: if (myRegistryQueryService != null) {
139: getBindingSite().getServiceBroker().releaseService(this ,
140: RegistryQueryService.class, myRegistryQueryService);
141: myRegistryQueryService = null;
142: }
143:
144: if (myQuiescenceReportService != null) {
145: getBindingSite().getServiceBroker().releaseService(this ,
146: QuiescenceReportService.class,
147: myQuiescenceReportService);
148: myQuiescenceReportService = null;
149: }
150:
151: if (myAgentIdentificationService != null) {
152: getBindingSite().getServiceBroker().releaseService(this ,
153: AgentIdentificationService.class,
154: myAgentIdentificationService);
155: myAgentIdentificationService = null;
156: }
157:
158: if ((myLoggingService != null)
159: && (myLoggingService != LoggingService.NULL)) {
160: getBindingSite().getServiceBroker().releaseService(this ,
161: LoggingService.class, myLoggingService);
162: myLoggingService = null;
163: }
164: super .unload();
165: }
166:
167: protected void setupSubscriptions() {
168: myAgentName = getBindingSite().getAgentIdentifier().toString();
169:
170: myClientRequestSubscription = (IncrementalSubscription) subscribe(myQueryRequestPredicate);
171: myLineageSubscription = (IncrementalSubscription) subscribe(myLineagePredicate);
172:
173: Collection params = getDelegate().getParameters();
174: if (params.size() > 0) {
175: myDistributedYPServers = Boolean.valueOf(
176: (String) params.iterator().next()).booleanValue();
177: } else {
178: myDistributedYPServers = false;
179: }
180:
181: if (didRehydrate()) {
182: // Requeue any unanswered MMQueries
183: for (Iterator iterator = myClientRequestSubscription
184: .iterator(); iterator.hasNext();) {
185: MMQueryRequest queryRequest = (MMQueryRequest) iterator
186: .next();
187: MMRoleQuery query = (MMRoleQuery) queryRequest
188: .getQuery();
189: Collection result = queryRequest.getResult();
190:
191: if ((!query.getObsolete())
192: && ((result == null) || (result.isEmpty()))) {
193: if (myLoggingService.isDebugEnabled()) {
194: myLoggingService
195: .debug("setupSubscription: putting "
196: + queryRequest
197: + " back on queue after rehydration.");
198: addRQ(queryRequest);
199: }
200: }
201: }
202: }
203: }
204:
205: protected void execute() {
206: if (myClientRequestSubscription.hasChanged()) {
207:
208: for (Iterator i = myClientRequestSubscription
209: .getAddedCollection().iterator(); i.hasNext();) {
210: addRQ((MMQueryRequest) i.next());
211: }
212: }
213:
214: RQ r;
215: while ((r = getPendingRQ()) != null) {
216: if (!r.query.getObsolete()) {
217: if (r.exception == null) {
218: handleResponse(r);
219: } else {
220: handleException(r);
221: }
222: } else {
223: // Don't bother to process the response to obsolete queries
224: if (myLoggingService.isDebugEnabled()) {
225: myLoggingService
226: .debug("execute: ignoring obsolete registry query - "
227: + r);
228: }
229: }
230: }
231:
232: handleQuiescenceReport();
233: }
234:
235: protected void handleException(RQ r) {
236: // Log the error and try again later
237: retryErrorLog(r, getAgentIdentifier()
238: + " Exception querying registry for "
239: + r.query.getRole().toString() + ", try again later.",
240: r.exception);
241: r.exception = null;
242: }
243:
244: private void retryErrorLog(RQ r, String message) {
245: retryErrorLog(r, message, null);
246: }
247:
248: // When an error occurs, but we'll be retrying later, treat it as a DEBUG
249: // at first. After a while it becomes an error.
250: private void retryErrorLog(RQ r, String message, Throwable e) {
251: int rand = (int) (Math.random() * 10000) + 1000;
252: QueryAlarm alarm = new QueryAlarm(r, getAlarmService()
253: .currentTimeMillis()
254: + rand);
255: getAlarmService().addAlarm(alarm);
256: // Alarms silently make us non-quiescent -- so keep track of when we have any
257: synchronized (myOutstandingAlarms) {
258: myOutstandingAlarms.add(alarm);
259: }
260:
261: if (myLoggingService.isDebugEnabled()) {
262: myLoggingService.debug(getAgentIdentifier()
263: + " adding a QueryAlarm for r " + r + " alarm - "
264: + alarm);
265: }
266:
267: if (System.currentTimeMillis() > getWarningCutoffTime()) {
268: if (e == null)
269: myLoggingService.error(getAgentIdentifier() + message);
270: else
271: myLoggingService.error(getAgentIdentifier() + message,
272: e);
273: } else if (myLoggingService.isDebugEnabled()) {
274: if (e == null)
275: myLoggingService.debug(getAgentIdentifier() + message);
276: else
277: myLoggingService.debug(getAgentIdentifier() + message,
278: e);
279: }
280: }
281:
282: protected void addRQ(MMQueryRequest queryRequest) {
283: MMRoleQuery query = (MMRoleQuery) queryRequest.getQuery();
284:
285: RegistryQuery rq = new RegistryQueryImpl();
286: RQ r;
287:
288: // Find all service providers for specifed Role
289: ServiceClassification roleSC = new ServiceClassificationImpl(
290: query.getRole().toString(), query.getRole().toString(),
291: UDDIConstants.MILITARY_SERVICE_SCHEME);
292: rq.addServiceClassification(roleSC);
293:
294: if (myDistributedYPServers) {
295: r = new RQ(queryRequest, query, rq);
296: } else {
297: r = new RQ(queryRequest, query, rq);
298: }
299:
300: postRQ(r);
301: }
302:
303: protected void handleResponse(RQ r) {
304: MMQueryRequest queryRequest = r.queryRequest;
305: MMRoleQuery query = r.query;
306:
307: Collection services = r.services;
308:
309: if (myLoggingService.isDebugEnabled()) {
310: myLoggingService.debug(myAgentName
311: + " registry query result size is : "
312: + services.size() + " for query: "
313: + query.getRole().toString() + " "
314: + new Date(query.getTimeSpan().getStartTime())
315: + " to "
316: + new Date(query.getTimeSpan().getEndTime()));
317: }
318:
319: ArrayList scoredServiceDescriptions = new ArrayList();
320: for (Iterator iter = services.iterator(); iter.hasNext();) {
321: ServiceInfo serviceInfo = (ServiceInfo) iter.next();
322: int score = query.getServiceInfoScorer().scoreServiceInfo(
323: serviceInfo);
324:
325: if (score >= 0) {
326: scoredServiceDescriptions
327: .add(new ScoredServiceDescriptionImpl(score,
328: serviceInfo));
329: if (myLoggingService.isDebugEnabled()) {
330: myLoggingService.debug(myAgentName
331: + ":execute: adding Provider name: "
332: + serviceInfo.getProviderName()
333: + " Service name: "
334: + serviceInfo.getServiceName()
335: + " Service score: " + score);
336: }
337: } else {
338: if (myLoggingService.isDebugEnabled()) {
339: myLoggingService.debug(myAgentName
340: + ":execute: ignoring Provider name: "
341: + serviceInfo.getProviderName()
342: + " Service name: "
343: + serviceInfo.getServiceName()
344: + " Service score: " + score);
345: }
346: }
347: }
348:
349: if (scoredServiceDescriptions.isEmpty()) {
350: if ((myDistributedYPServers) && (!r.getNextContextFailed)) {
351: if (myLoggingService.isDebugEnabled()) {
352: myLoggingService.debug(myAgentName
353: + " no matching provider for "
354: + query.getRole() + " in "
355: + r.currentYPContext
356: + " retrying in next context.");
357: }
358: postRQ(r);
359: } else {
360: // Couldn't find another YPServer to search
361: retryErrorLog(r, myAgentName
362: + " unable to find provider for "
363: + query.getRole()
364: + ", publishing empty query result. "
365: + "Will try query again later.");
366:
367: }
368: } else {
369: Collections.sort(scoredServiceDescriptions);
370: }
371:
372: ((MMQueryRequestImpl) queryRequest)
373: .setResult(scoredServiceDescriptions);
374: ((MMQueryRequestImpl) queryRequest).setQueryCount(queryRequest
375: .getQueryCount() + 1);
376: getBlackboardService().publishChange(queryRequest);
377:
378: if (myLoggingService.isDebugEnabled()) {
379: myLoggingService.debug(myAgentName
380: + ": publishChanged query");
381: }
382: }
383:
384: protected void handleQuiescenceReport() {
385: // Whenever we submit a query to the YP we go off into the ether
386: // So if there are outstanding YP queries or alarms, then mark the fact that we are not done yet
387: // so that Quiescence stuff doesnt decide we're done prematurely early
388: // Note that myPendingRQs should _always_ be empty at this point. And we'll only have oustandingAlarms
389: // if there were exceptions talking to the YP.
390:
391: if (myQuiescenceReportService != null) {
392:
393: // Check if done with YP queries in synch blocks
394: // since callbacks may be running
395: boolean noOutStandingRQs = false;
396: synchronized (myOutstandingRQs) {
397:
398: // Remove any queries marked as obsolete
399: for (Iterator iterator = myOutstandingRQs.iterator(); iterator
400: .hasNext();) {
401: RQ outstandingRQ = (RQ) iterator.next();
402: if (outstandingRQ.query.getObsolete()) {
403: iterator.remove();
404:
405: if (myLoggingService.isDebugEnabled()) {
406: myLoggingService
407: .debug("handleQuiescenceReport: removing obsolete RQ - "
408: + outstandingRQ
409: + " - from myOutstandingRQs.");
410: }
411: }
412: }
413: noOutStandingRQs = myOutstandingRQs.isEmpty();
414: }
415:
416: boolean noPendRQs = false;
417: synchronized (myPendingRQs) {
418: noPendRQs = myPendingRQs.isEmpty();
419: }
420:
421: boolean noOutstandingAlarms = false;
422: synchronized (myOutstandingAlarms) {
423: noOutstandingAlarms = myOutstandingAlarms.isEmpty();
424: }
425:
426: if (noOutStandingRQs && noPendRQs && noOutstandingAlarms) {
427: // Nothing on the lists and no outstanding alarms - so we're done
428: myQuiescenceReportService.setQuiescentState();
429: resetWarningCutoffTime();
430: if (myLoggingService.isInfoEnabled())
431: myLoggingService
432: .info(myAgentName
433: + " finished all YP queries. Now quiescent.");
434:
435: } else {
436: // Some query waiting for an answer, or waiting for this Plugin to
437: // handle it, or waiting to retry a query
438: // We're not done
439:
440: myQuiescenceReportService.clearQuiescentState();
441: if (myLoggingService.isInfoEnabled())
442: myLoggingService
443: .info(myAgentName
444: + " has outstanding YP queries or answers. "
445: + "Not quiescent.");
446: if (myLoggingService.isDebugEnabled()) {
447: // Get the toStrings in synch blocks since callbacks
448: // may currently be executing
449: String outstandingRQs = "";
450: String pendingRQs = "";
451: String outstandingAlarms = "";
452: synchronized (myOutstandingRQs) {
453: outstandingRQs = myOutstandingRQs.size() + ". "
454: + myOutstandingRQs.toString();
455: }
456: synchronized (myPendingRQs) {
457: pendingRQs = myPendingRQs.size() + ". "
458: + myPendingRQs.toString();
459: }
460:
461: synchronized (myOutstandingAlarms) {
462: outstandingAlarms = myOutstandingAlarms.size()
463: + ". " + myOutstandingAlarms.toString();
464: }
465:
466: myLoggingService
467: .debug("\tYP questions outstanding: "
468: +
469: // AMH: Actually print the outstanding RQs
470: // myOutstandingRQs.size() +
471: outstandingRQs
472: + ". YP answers to process: "
473: + pendingRQs
474: +
475: // myPendingRQs.size() +
476: ". Outstanding alarms: "
477: + outstandingAlarms);
478: }
479: }
480: }
481: }
482:
483: protected long getWarningCutoffTime() {
484: if (myWarningCutoffTime == 0) {
485: WARNING_SUPPRESSION_INTERVAL = Integer.getInteger(
486: QUERY_GRACE_PERIOD_PROPERTY,
487: WARNING_SUPPRESSION_INTERVAL).intValue();
488: myWarningCutoffTime = System.currentTimeMillis()
489: + WARNING_SUPPRESSION_INTERVAL * 60000;
490: }
491:
492: return myWarningCutoffTime;
493: }
494:
495: protected void resetWarningCutoffTime() {
496: myWarningCutoffTime = -1;
497: }
498:
499: protected Lineage getLineage(int lineageType, TimeSpan timeSpan) {
500: if (myLoggingService.isDebugEnabled()) {
501: myLoggingService.debug(myAgentName
502: + ": getLineage() requested lineage of type "
503: + Lineage.typeToRole(lineageType)
504: + new Date(timeSpan.getStartTime()) + " - "
505: + new Date(timeSpan.getEndTime()));
506: }
507:
508: for (Iterator iterator = myLineageSubscription.iterator(); iterator
509: .hasNext();) {
510: Lineage lineage = (Lineage) iterator.next();
511: if (lineage.getType() == lineageType) {
512: Collection lineageTimeSpans = lineage.getSchedule()
513: .getOverlappingScheduleElements(
514: timeSpan.getStartTime(),
515: timeSpan.getEndTime());
516:
517: if (!lineageTimeSpans.isEmpty()) {
518: TimeSpan lineageTimeSpan = (TimeSpan) lineageTimeSpans
519: .iterator().next();
520: if ((lineageTimeSpan.getStartTime() <= timeSpan
521: .getStartTime())
522: && (lineageTimeSpan.getEndTime() >= timeSpan
523: .getEndTime())) {
524:
525: if (myLoggingService.isDebugEnabled()) {
526: myLoggingService.debug(myAgentName
527: + ": getLineage() returning "
528: + lineage);
529: }
530: return lineage;
531: } else {
532: myLoggingService.warn(myAgentName
533: + ": getLineage() requested timeSpan "
534: + new Date(timeSpan.getStartTime())
535: + " - "
536: + new Date(timeSpan.getEndTime())
537: + " spans more than one lineage.");
538: return null;
539: }
540: }
541: }
542: }
543:
544: myLoggingService.error(myAgentName
545: + ": getLineage() requested lineage of type "
546: + Lineage.typeToRole(lineageType)
547: + new Date(timeSpan.getStartTime()) + " - "
548: + new Date(timeSpan.getEndTime())
549: + " does not match any lineage.\n"
550: + myLineageSubscription);
551: return null;
552: }
553:
554: private class RQ {
555: MMQueryRequest queryRequest;
556: MMRoleQuery query;
557: RegistryQuery rq;
558: Lineage ypLineage;
559:
560: Collection services;
561: Exception exception;
562: boolean complete = false;
563: Object previousYPContext = null;
564: Object currentYPContext = null;
565: boolean getNextContextFailed = false;
566:
567: RQ(MMQueryRequest queryRequest, MMRoleQuery query,
568: RegistryQuery rq) {
569: this .queryRequest = queryRequest;
570: this .query = query;
571: this .rq = rq;
572: this .ypLineage = null;
573: }
574:
575: // Verbose toString used when printing myOutstandingRQs in handleQuiescenceReport
576: public String toString() {
577: return "RQ for query <" + queryRequest.getUID()
578: + ": ResultCode " + queryRequest.getResultCode()
579: + ", query: " + query + "> using lineage "
580: + ypLineage + ". Has exception? "
581: + (exception == null ? "No" : "Yes")
582: + ". Complete? " + complete
583: + " getNextContextFailed? " + getNextContextFailed;
584: }
585: }
586:
587: // issue a async request
588: private void postRQ(final RQ r) {
589: // Don't bother to process obsolete queries
590: if (r.query.getObsolete()) {
591: if (myLoggingService.isDebugEnabled()) {
592: myLoggingService
593: .debug("postRQ: ignoring obsolete MMQueryRequest - "
594: + r.queryRequest);
595: }
596: // Update quiescence
597: handleQuiescenceReport();
598: return;
599: }
600:
601: if (myLoggingService.isDebugEnabled()) {
602: myLoggingService.debug(getAgentIdentifier() + ": posting "
603: + r + " (" + r.rq + ")");
604: }
605:
606: synchronized (myOutstandingRQs) {
607: myOutstandingRQs.add(r);
608: }
609:
610: if (myDistributedYPServers) {
611: findServiceWithDistributedYP(r);
612: } else {
613: findServiceWithCentralizedYP(r);
614: }
615: }
616:
617: // note an async response and wake the plugin
618: private void pendRQ(RQ r) {
619: if (myLoggingService.isDebugEnabled()) {
620: myLoggingService.debug(getAgentIdentifier() + " pending "
621: + r + " (" + r.rq + ")");
622: }
623: r.complete = true;
624: synchronized (myOutstandingRQs) {
625: myOutstandingRQs.remove(r);
626: }
627: synchronized (myPendingRQs) {
628: myPendingRQs.add(r);
629: }
630: wake(); // tell the plugin to wake up
631: }
632:
633: // get a pending RQ (or null) so that we can deal with it
634: private RQ getPendingRQ() {
635: RQ r = null;
636: synchronized (myPendingRQs) {
637: if (!myPendingRQs.isEmpty()) {
638: r = (RQ) myPendingRQs.remove(0); // treat like a fifo
639: if (myLoggingService.isDebugEnabled()) {
640: myLoggingService.debug(getAgentIdentifier()
641: + " retrieving " + r + " (" + r.rq + ")");
642: }
643: }
644: }
645: return r;
646: }
647:
648: private boolean useYPCommunitySearchPath(final RQ r) {
649: Lineage opconLineage = getLineage(Lineage.OPCON, r.query
650: .getTimeSpan());
651: Lineage adconLineage = getLineage(Lineage.ADCON, r.query
652: .getTimeSpan());
653:
654: if (myLoggingService.isDebugEnabled()) {
655: myLoggingService.debug(getAgentIdentifier()
656: + ": useYPCommunitySearchPath() " + " timeSpan = ("
657: + new Date(r.query.getTimeSpan().getStartTime())
658: + ", "
659: + new Date(r.query.getTimeSpan().getEndTime())
660: + ") adconLineage = " + adconLineage
661: + " opconLineage = " + opconLineage);
662: if ((opconLineage != null) && (adconLineage != null)) {
663: myLoggingService
664: .debug(getAgentIdentifier()
665: + ": useYPCommunitySearchPath()() "
666: + " adconLineage.getList() = "
667: + adconLineage.getList()
668: + " opconLineage.getList() = "
669: + opconLineage.getList()
670: + " opconLineage.getList().equals(adconLineage.getList()) == "
671: + opconLineage.getList().equals(
672: adconLineage.getList()));
673: }
674: }
675:
676: if ((adconLineage == null) || (opconLineage == null)) {
677: if (myLoggingService.isDebugEnabled()) {
678: myLoggingService
679: .debug(getAgentIdentifier()
680: + ": useYPCommunitySearchPath() "
681: + " returning false - "
682: + "no Administrative or Operational lineage."
683: + " ADCON lineage = "
684: + adconLineage
685: + " OPCON lineage = "
686: + opconLineage
687: + " for "
688: + new Date(r.query.getTimeSpan()
689: .getStartTime())
690: + " to "
691: + new Date(r.query.getTimeSpan()
692: .getEndTime()));
693: }
694:
695: return false;
696: } else {
697: return (opconLineage.getList().equals(adconLineage
698: .getList()));
699: }
700: }
701:
702: private void findServiceWithCentralizedYP(final RQ r) {
703: if (myLoggingService.isDebugEnabled()) {
704: myLoggingService.debug(getAgentIdentifier()
705: + " findServiceWithCentralizedYP: r = " + r);
706: }
707:
708: myRegistryQueryService.findServiceAndBinding(r.rq,
709: new RegistryQueryService.Callback() {
710: public void invoke(Object result) {
711: r.services = (Collection) result;
712: if (myLoggingService.isDebugEnabled()) {
713: myLoggingService.debug(getAgentIdentifier()
714: + " results = " + result + " for "
715: + r.currentYPContext);
716: }
717: flush();
718: }
719:
720: public void handle(Exception e) {
721: r.exception = e;
722: if (myLoggingService.isDebugEnabled()) {
723: myLoggingService.debug(getAgentIdentifier()
724: + " failed during query of "
725: + r.queryRequest, e);
726: }
727: flush();
728: }
729:
730: private void flush() {
731: pendRQ(r);
732: }
733: });
734: }
735:
736: private void findServiceWithDistributedYP(final RQ r) {
737: if (useYPCommunitySearchPath(r)) {
738: if (myLoggingService.isDebugEnabled()) {
739: myLoggingService.debug(getAgentIdentifier()
740: + " findServiceWithDistributedYP: "
741: + " using YPCommunity search.");
742: }
743:
744: r.ypLineage = null;
745: myRegistryQueryService.findServiceAndBinding(
746: r.currentYPContext, r.rq,
747: new RegistryQueryService.CallbackWithContext() {
748: public void invoke(Object result) {
749: r.services = (Collection) result;
750: if (myLoggingService.isDebugEnabled()) {
751: myLoggingService
752: .debug(getAgentIdentifier()
753: + " results = "
754: + result + " for "
755: + r.currentYPContext);
756: }
757: flush();
758: }
759:
760: public void handle(Exception e) {
761: r.exception = e;
762: if (myLoggingService.isDebugEnabled()) {
763: myLoggingService
764: .debug(
765: getAgentIdentifier()
766: + " failed during query of "
767: + r.queryRequest
768: + " context = "
769: + r.currentYPContext,
770: e);
771: }
772: flush();
773: }
774:
775: public void setNextContext(Object context) {
776: if (myLoggingService.isDebugEnabled()) {
777: myLoggingService
778: .debug(getAgentIdentifier()
779: + " previous YPContext "
780: + r.currentYPContext
781: + " current YPContext "
782: + context);
783: }
784: r.previousYPContext = r.currentYPContext;
785: r.currentYPContext = context;
786:
787: if (context == null) {
788: r.getNextContextFailed = true;
789: }
790: }
791:
792: private void flush() {
793: pendRQ(r);
794: }
795: });
796: } else {
797: r.ypLineage = getLineage(Lineage.OPCON, r.query
798: .getTimeSpan());
799:
800: if (r.ypLineage == null) {
801: String errorMessage = getAgentIdentifier()
802: + " no Operation lineage for "
803: + new Date(r.query.getTimeSpan().getStartTime())
804: + " to "
805: + new Date(r.query.getTimeSpan().getEndTime());
806:
807: // AMH: Don't retry here. Instead, let the while in execute()
808: // call handleException to do this.
809: r.exception = new IllegalStateException(errorMessage);
810: if (myLoggingService.isDebugEnabled())
811: myLoggingService
812: .debug("findServiceWithDistributedYP: "
813: + " no Operation lineage, doing pendRQ with "
814: + " an IllegalStateException for "
815: + r);
816: pendRQ(r);
817: return;
818: }
819:
820: if (myLoggingService.isDebugEnabled()) {
821: myLoggingService.debug(getAgentIdentifier()
822: + " findServiceWithDistributedYP: "
823: + " using lineage based search - "
824: + " r.ypLineage = " + r.ypLineage);
825: }
826:
827: List lineageList = r.ypLineage.getList();
828: int listSize = lineageList.size();
829:
830: if (r.currentYPContext == null) {
831: r.currentYPContext = (listSize > 1) ? (String) lineageList
832: .get(listSize - 2)
833: : (String) r.ypLineage.getLeaf();
834: } else {
835: int index = lineageList.indexOf(r.currentYPContext);
836: if (index > 0) {
837: r.previousYPContext = r.currentYPContext;
838: r.currentYPContext = (String) lineageList
839: .get(index - 1);
840: } else {
841: // Reached the top of the lineage. Restart the search.
842: r.currentYPContext = (listSize > 1) ? (String) lineageList
843: .get(listSize - 2)
844: : (String) r.ypLineage.getLeaf();
845: }
846: }
847:
848: myRegistryQueryService.findServiceAndBinding(
849: (String) r.currentYPContext, r.rq,
850: new RegistryQueryService.CallbackWithContext() {
851: public void invoke(Object result) {
852: r.services = (Collection) result;
853: if (myLoggingService.isDebugEnabled()) {
854: myLoggingService
855: .debug(getAgentIdentifier()
856: + " results = "
857: + result + " for "
858: + r.currentYPContext);
859: }
860: flush();
861: }
862:
863: public void handle(Exception e) {
864: r.exception = e;
865: if (myLoggingService.isDebugEnabled()) {
866: myLoggingService
867: .debug(
868: getAgentIdentifier()
869: + " failed during query of "
870: + r.queryRequest
871: + " context = "
872: + r.currentYPContext,
873: e);
874: }
875: flush();
876: }
877:
878: public void setNextContext(Object context) {
879: if (myLoggingService.isDebugEnabled()) {
880: myLoggingService
881: .debug(getAgentIdentifier()
882: + " getNextContext() for "
883: + r.currentYPContext
884: + " returned "
885: + context);
886: }
887:
888: if (context == null) {
889: // Restart search
890: r.currentYPContext = null;
891: r.getNextContextFailed = true;
892: }
893: }
894:
895: private void flush() {
896: pendRQ(r);
897: }
898: });
899: }
900: }
901:
902: public class QueryAlarm implements Alarm {
903: private long expiresAt;
904: private boolean expired = false;
905: private RQ rq = null;
906:
907: public QueryAlarm(RQ rq, long expirationTime) {
908: expiresAt = expirationTime;
909: this .rq = rq;
910: }
911:
912: public long getExpirationTime() {
913: return expiresAt;
914: }
915:
916: public synchronized void expire() {
917: if (!expired) {
918: if (myLoggingService.isDebugEnabled()) {
919: myLoggingService.debug("expire: alarm = " + this
920: + " for RQ = " + rq);
921: }
922: synchronized (myOutstandingAlarms) {
923: myOutstandingAlarms.remove(this );
924: }
925: expired = true;
926: rq.complete = false;
927: postRQ(rq);
928: }
929: }
930:
931: public boolean hasExpired() {
932: return expired;
933: }
934:
935: public synchronized boolean cancel() {
936: boolean was = expired;
937: expired = true;
938: synchronized (myOutstandingAlarms) {
939: myOutstandingAlarms.remove(this );
940: }
941: return was;
942: }
943:
944: public String toString() {
945: return "<QueryAlarm " + expiresAt
946: + (expired ? "(Expired) " : " ")
947: + rq.query.getRole() + " "
948: + "for MatchmakerStubPlugin at "
949: + getAgentIdentifier() + ">";
950: }
951: }
952: }
|