Source Code Cross Referenced for MatchmakerStubPlugin.java in  » Science » Cougaar12_4 » org » cougaar » servicediscovery » plugin » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.servicediscovery.plugin 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 2002-2004 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:
027:        package org.cougaar.servicediscovery.plugin;
028:
029:        import java.util.ArrayList;
030:        import java.util.Collection;
031:        import java.util.Collections;
032:        import java.util.Date;
033:        import java.util.Iterator;
034:        import java.util.List;
035:
036:        import org.cougaar.core.agent.service.alarm.Alarm;
037:        import org.cougaar.core.blackboard.IncrementalSubscription;
038:        import org.cougaar.core.service.AgentIdentificationService;
039:        import org.cougaar.core.service.LoggingService;
040:        import org.cougaar.core.service.QuiescenceReportService;
041:        import org.cougaar.planning.ldm.plan.Role;
042:        import org.cougaar.planning.plugin.legacy.SimplePlugin;
043:        import org.cougaar.servicediscovery.Constants;
044:        import org.cougaar.servicediscovery.description.Lineage;
045:        import org.cougaar.servicediscovery.description.MMRoleQuery;
046:        import org.cougaar.servicediscovery.description.ScoredServiceDescriptionImpl;
047:        import org.cougaar.servicediscovery.description.ServiceClassification;
048:        import org.cougaar.servicediscovery.description.ServiceClassificationImpl;
049:        import org.cougaar.servicediscovery.description.ServiceInfo;
050:        import org.cougaar.servicediscovery.service.RegistryQueryService;
051:        import org.cougaar.servicediscovery.transaction.MMQueryRequest;
052:        import org.cougaar.servicediscovery.transaction.MMQueryRequestImpl;
053:        import org.cougaar.servicediscovery.transaction.RegistryQuery;
054:        import org.cougaar.servicediscovery.transaction.RegistryQueryImpl;
055:        import org.cougaar.servicediscovery.util.UDDIConstants;
056:        import org.cougaar.util.TimeSpan;
057:        import org.cougaar.util.UnaryPredicate;
058:
059:        /**
060:         *
061:         * Query the YellowPages for possible service providers
062:         *
063:         */
064:        public class MatchmakerStubPlugin extends SimplePlugin {
065:            private static int WARNING_SUPPRESSION_INTERVAL = 2;
066:            private long myWarningCutoffTime = 0;
067:            private static final String QUERY_GRACE_PERIOD_PROPERTY = "org.cougaar.servicediscovery.plugin.QueryGracePeriod";
068:
069:            private boolean myDistributedYPServers;
070:            private String myAgentName;
071:            private LoggingService myLoggingService;
072:            private RegistryQueryService myRegistryQueryService;
073:            private QuiescenceReportService myQuiescenceReportService;
074:            private AgentIdentificationService myAgentIdentificationService;
075:            private IncrementalSubscription myClientRequestSubscription;
076:            private IncrementalSubscription myLineageSubscription;
077:
078:            // pending RQs are returned RQ which haven't been consumed by the plugin yet
079:            private ArrayList myPendingRQs = new ArrayList();
080:
081:            // Outstanding RQ are those which have been issued but have not yet returned
082:            // used to support quiescence.
083:            private ArrayList myOutstandingRQs = new ArrayList();
084:
085:            // Outstanding alarms (any means non-quiescent)
086:            // used to support quiescence.
087:            private ArrayList myOutstandingAlarms = new ArrayList();
088:
089:            private UnaryPredicate myLineagePredicate = new UnaryPredicate() {
090:                public boolean execute(Object o) {
091:                    return (o instanceof  Lineage);
092:                }
093:            };
094:
095:            private UnaryPredicate myQueryRequestPredicate = new UnaryPredicate() {
096:                public boolean execute(Object o) {
097:                    if (o instanceof  MMQueryRequest) {
098:                        MMQueryRequest qr = (MMQueryRequest) o;
099:                        return (qr.getQuery() instanceof  MMRoleQuery);
100:                    }
101:                    return false;
102:                }
103:            };
104:
105:            public void load() {
106:                super .load();
107:
108:                myLoggingService = (LoggingService) getBindingSite()
109:                        .getServiceBroker().getService(this ,
110:                                LoggingService.class, null);
111:                if (myLoggingService == null) {
112:                    myLoggingService = LoggingService.NULL;
113:                }
114:
115:                myRegistryQueryService = (RegistryQueryService) getBindingSite()
116:                        .getServiceBroker().getService(this ,
117:                                RegistryQueryService.class, null);
118:                myAgentIdentificationService = (AgentIdentificationService) getBindingSite()
119:                        .getServiceBroker().getService(this ,
120:                                AgentIdentificationService.class, null);
121:
122:                // Set up the QuiescenceReportService so that while waiting for the YP and
123:                // alarms we don't go quiescent by mistake
124:                myQuiescenceReportService = (QuiescenceReportService) getBindingSite()
125:                        .getServiceBroker().getService(this ,
126:                                QuiescenceReportService.class, null);
127:
128:                if (myQuiescenceReportService != null)
129:                    myQuiescenceReportService
130:                            .setAgentIdentificationService(myAgentIdentificationService);
131:
132:                if (myRegistryQueryService == null)
133:                    throw new RuntimeException(
134:                            "Unable to obtain RegistryQuery service");
135:            }
136:
137:            public void unload() {
138:                if (myRegistryQueryService != null) {
139:                    getBindingSite().getServiceBroker().releaseService(this ,
140:                            RegistryQueryService.class, myRegistryQueryService);
141:                    myRegistryQueryService = null;
142:                }
143:
144:                if (myQuiescenceReportService != null) {
145:                    getBindingSite().getServiceBroker().releaseService(this ,
146:                            QuiescenceReportService.class,
147:                            myQuiescenceReportService);
148:                    myQuiescenceReportService = null;
149:                }
150:
151:                if (myAgentIdentificationService != null) {
152:                    getBindingSite().getServiceBroker().releaseService(this ,
153:                            AgentIdentificationService.class,
154:                            myAgentIdentificationService);
155:                    myAgentIdentificationService = null;
156:                }
157:
158:                if ((myLoggingService != null)
159:                        && (myLoggingService != LoggingService.NULL)) {
160:                    getBindingSite().getServiceBroker().releaseService(this ,
161:                            LoggingService.class, myLoggingService);
162:                    myLoggingService = null;
163:                }
164:                super .unload();
165:            }
166:
167:            protected void setupSubscriptions() {
168:                myAgentName = getBindingSite().getAgentIdentifier().toString();
169:
170:                myClientRequestSubscription = (IncrementalSubscription) subscribe(myQueryRequestPredicate);
171:                myLineageSubscription = (IncrementalSubscription) subscribe(myLineagePredicate);
172:
173:                Collection params = getDelegate().getParameters();
174:                if (params.size() > 0) {
175:                    myDistributedYPServers = Boolean.valueOf(
176:                            (String) params.iterator().next()).booleanValue();
177:                } else {
178:                    myDistributedYPServers = false;
179:                }
180:
181:                if (didRehydrate()) {
182:                    // Requeue any unanswered MMQueries
183:                    for (Iterator iterator = myClientRequestSubscription
184:                            .iterator(); iterator.hasNext();) {
185:                        MMQueryRequest queryRequest = (MMQueryRequest) iterator
186:                                .next();
187:                        MMRoleQuery query = (MMRoleQuery) queryRequest
188:                                .getQuery();
189:                        Collection result = queryRequest.getResult();
190:
191:                        if ((!query.getObsolete())
192:                                && ((result == null) || (result.isEmpty()))) {
193:                            if (myLoggingService.isDebugEnabled()) {
194:                                myLoggingService
195:                                        .debug("setupSubscription: putting "
196:                                                + queryRequest
197:                                                + " back on queue after rehydration.");
198:                                addRQ(queryRequest);
199:                            }
200:                        }
201:                    }
202:                }
203:            }
204:
205:            protected void execute() {
206:                if (myClientRequestSubscription.hasChanged()) {
207:
208:                    for (Iterator i = myClientRequestSubscription
209:                            .getAddedCollection().iterator(); i.hasNext();) {
210:                        addRQ((MMQueryRequest) i.next());
211:                    }
212:                }
213:
214:                RQ r;
215:                while ((r = getPendingRQ()) != null) {
216:                    if (!r.query.getObsolete()) {
217:                        if (r.exception == null) {
218:                            handleResponse(r);
219:                        } else {
220:                            handleException(r);
221:                        }
222:                    } else {
223:                        // Don't bother to process the response to obsolete queries
224:                        if (myLoggingService.isDebugEnabled()) {
225:                            myLoggingService
226:                                    .debug("execute: ignoring obsolete registry query - "
227:                                            + r);
228:                        }
229:                    }
230:                }
231:
232:                handleQuiescenceReport();
233:            }
234:
235:            protected void handleException(RQ r) {
236:                // Log the error and try again later
237:                retryErrorLog(r, getAgentIdentifier()
238:                        + " Exception querying registry for "
239:                        + r.query.getRole().toString() + ", try again later.",
240:                        r.exception);
241:                r.exception = null;
242:            }
243:
244:            private void retryErrorLog(RQ r, String message) {
245:                retryErrorLog(r, message, null);
246:            }
247:
248:            // When an error occurs, but we'll be retrying later, treat it as a DEBUG
249:            // at first. After a while it becomes an error.
250:            private void retryErrorLog(RQ r, String message, Throwable e) {
251:                int rand = (int) (Math.random() * 10000) + 1000;
252:                QueryAlarm alarm = new QueryAlarm(r, getAlarmService()
253:                        .currentTimeMillis()
254:                        + rand);
255:                getAlarmService().addAlarm(alarm);
256:                // Alarms silently make us non-quiescent -- so keep track of when we have any
257:                synchronized (myOutstandingAlarms) {
258:                    myOutstandingAlarms.add(alarm);
259:                }
260:
261:                if (myLoggingService.isDebugEnabled()) {
262:                    myLoggingService.debug(getAgentIdentifier()
263:                            + " adding a QueryAlarm for r " + r + " alarm - "
264:                            + alarm);
265:                }
266:
267:                if (System.currentTimeMillis() > getWarningCutoffTime()) {
268:                    if (e == null)
269:                        myLoggingService.error(getAgentIdentifier() + message);
270:                    else
271:                        myLoggingService.error(getAgentIdentifier() + message,
272:                                e);
273:                } else if (myLoggingService.isDebugEnabled()) {
274:                    if (e == null)
275:                        myLoggingService.debug(getAgentIdentifier() + message);
276:                    else
277:                        myLoggingService.debug(getAgentIdentifier() + message,
278:                                e);
279:                }
280:            }
281:
282:            protected void addRQ(MMQueryRequest queryRequest) {
283:                MMRoleQuery query = (MMRoleQuery) queryRequest.getQuery();
284:
285:                RegistryQuery rq = new RegistryQueryImpl();
286:                RQ r;
287:
288:                // Find all service providers for specifed Role
289:                ServiceClassification roleSC = new ServiceClassificationImpl(
290:                        query.getRole().toString(), query.getRole().toString(),
291:                        UDDIConstants.MILITARY_SERVICE_SCHEME);
292:                rq.addServiceClassification(roleSC);
293:
294:                if (myDistributedYPServers) {
295:                    r = new RQ(queryRequest, query, rq);
296:                } else {
297:                    r = new RQ(queryRequest, query, rq);
298:                }
299:
300:                postRQ(r);
301:            }
302:
303:            protected void handleResponse(RQ r) {
304:                MMQueryRequest queryRequest = r.queryRequest;
305:                MMRoleQuery query = r.query;
306:
307:                Collection services = r.services;
308:
309:                if (myLoggingService.isDebugEnabled()) {
310:                    myLoggingService.debug(myAgentName
311:                            + " registry query result size is : "
312:                            + services.size() + " for query: "
313:                            + query.getRole().toString() + " "
314:                            + new Date(query.getTimeSpan().getStartTime())
315:                            + " to "
316:                            + new Date(query.getTimeSpan().getEndTime()));
317:                }
318:
319:                ArrayList scoredServiceDescriptions = new ArrayList();
320:                for (Iterator iter = services.iterator(); iter.hasNext();) {
321:                    ServiceInfo serviceInfo = (ServiceInfo) iter.next();
322:                    int score = query.getServiceInfoScorer().scoreServiceInfo(
323:                            serviceInfo);
324:
325:                    if (score >= 0) {
326:                        scoredServiceDescriptions
327:                                .add(new ScoredServiceDescriptionImpl(score,
328:                                        serviceInfo));
329:                        if (myLoggingService.isDebugEnabled()) {
330:                            myLoggingService.debug(myAgentName
331:                                    + ":execute: adding Provider name: "
332:                                    + serviceInfo.getProviderName()
333:                                    + " Service name: "
334:                                    + serviceInfo.getServiceName()
335:                                    + " Service score: " + score);
336:                        }
337:                    } else {
338:                        if (myLoggingService.isDebugEnabled()) {
339:                            myLoggingService.debug(myAgentName
340:                                    + ":execute: ignoring Provider name: "
341:                                    + serviceInfo.getProviderName()
342:                                    + " Service name: "
343:                                    + serviceInfo.getServiceName()
344:                                    + " Service score: " + score);
345:                        }
346:                    }
347:                }
348:
349:                if (scoredServiceDescriptions.isEmpty()) {
350:                    if ((myDistributedYPServers) && (!r.getNextContextFailed)) {
351:                        if (myLoggingService.isDebugEnabled()) {
352:                            myLoggingService.debug(myAgentName
353:                                    + " no matching provider for "
354:                                    + query.getRole() + " in "
355:                                    + r.currentYPContext
356:                                    + " retrying in next context.");
357:                        }
358:                        postRQ(r);
359:                    } else {
360:                        // Couldn't find another YPServer to search
361:                        retryErrorLog(r, myAgentName
362:                                + " unable to find provider for "
363:                                + query.getRole()
364:                                + ", publishing empty query result. "
365:                                + "Will try query again later.");
366:
367:                    }
368:                } else {
369:                    Collections.sort(scoredServiceDescriptions);
370:                }
371:
372:                ((MMQueryRequestImpl) queryRequest)
373:                        .setResult(scoredServiceDescriptions);
374:                ((MMQueryRequestImpl) queryRequest).setQueryCount(queryRequest
375:                        .getQueryCount() + 1);
376:                getBlackboardService().publishChange(queryRequest);
377:
378:                if (myLoggingService.isDebugEnabled()) {
379:                    myLoggingService.debug(myAgentName
380:                            + ": publishChanged query");
381:                }
382:            }
383:
384:            protected void handleQuiescenceReport() {
385:                // Whenever we submit a query to the YP we go off into the ether
386:                // So if there are outstanding YP queries or alarms, then mark the fact that we are not done yet
387:                // so that Quiescence stuff doesnt decide we're done prematurely early
388:                // Note that myPendingRQs should _always_ be empty at this point. And we'll only have oustandingAlarms
389:                // if there were exceptions talking to the YP.
390:
391:                if (myQuiescenceReportService != null) {
392:
393:                    // Check if done with YP queries in synch blocks
394:                    // since callbacks may be running
395:                    boolean noOutStandingRQs = false;
396:                    synchronized (myOutstandingRQs) {
397:
398:                        // Remove any queries marked as obsolete
399:                        for (Iterator iterator = myOutstandingRQs.iterator(); iterator
400:                                .hasNext();) {
401:                            RQ outstandingRQ = (RQ) iterator.next();
402:                            if (outstandingRQ.query.getObsolete()) {
403:                                iterator.remove();
404:
405:                                if (myLoggingService.isDebugEnabled()) {
406:                                    myLoggingService
407:                                            .debug("handleQuiescenceReport: removing obsolete RQ - "
408:                                                    + outstandingRQ
409:                                                    + " - from myOutstandingRQs.");
410:                                }
411:                            }
412:                        }
413:                        noOutStandingRQs = myOutstandingRQs.isEmpty();
414:                    }
415:
416:                    boolean noPendRQs = false;
417:                    synchronized (myPendingRQs) {
418:                        noPendRQs = myPendingRQs.isEmpty();
419:                    }
420:
421:                    boolean noOutstandingAlarms = false;
422:                    synchronized (myOutstandingAlarms) {
423:                        noOutstandingAlarms = myOutstandingAlarms.isEmpty();
424:                    }
425:
426:                    if (noOutStandingRQs && noPendRQs && noOutstandingAlarms) {
427:                        // Nothing on the lists and no outstanding alarms - so we're done
428:                        myQuiescenceReportService.setQuiescentState();
429:                        resetWarningCutoffTime();
430:                        if (myLoggingService.isInfoEnabled())
431:                            myLoggingService
432:                                    .info(myAgentName
433:                                            + " finished all YP queries. Now quiescent.");
434:
435:                    } else {
436:                        // Some query waiting for an answer, or waiting for this Plugin to 
437:                        // handle it, or waiting to retry a query
438:                        // We're not done
439:
440:                        myQuiescenceReportService.clearQuiescentState();
441:                        if (myLoggingService.isInfoEnabled())
442:                            myLoggingService
443:                                    .info(myAgentName
444:                                            + " has outstanding YP queries or answers. "
445:                                            + "Not quiescent.");
446:                        if (myLoggingService.isDebugEnabled()) {
447:                            // Get the toStrings in synch blocks since callbacks
448:                            // may currently be executing
449:                            String outstandingRQs = "";
450:                            String pendingRQs = "";
451:                            String outstandingAlarms = "";
452:                            synchronized (myOutstandingRQs) {
453:                                outstandingRQs = myOutstandingRQs.size() + ". "
454:                                        + myOutstandingRQs.toString();
455:                            }
456:                            synchronized (myPendingRQs) {
457:                                pendingRQs = myPendingRQs.size() + ". "
458:                                        + myPendingRQs.toString();
459:                            }
460:
461:                            synchronized (myOutstandingAlarms) {
462:                                outstandingAlarms = myOutstandingAlarms.size()
463:                                        + ". " + myOutstandingAlarms.toString();
464:                            }
465:
466:                            myLoggingService
467:                                    .debug("\tYP questions outstanding: "
468:                                            +
469:                                            // AMH: Actually print the outstanding RQs
470:                                            //				 myOutstandingRQs.size() + 
471:                                            outstandingRQs
472:                                            + ". YP answers to process: "
473:                                            + pendingRQs
474:                                            +
475:                                            //				 myPendingRQs.size() + 
476:                                            ". Outstanding alarms: "
477:                                            + outstandingAlarms);
478:                        }
479:                    }
480:                }
481:            }
482:
483:            protected long getWarningCutoffTime() {
484:                if (myWarningCutoffTime == 0) {
485:                    WARNING_SUPPRESSION_INTERVAL = Integer.getInteger(
486:                            QUERY_GRACE_PERIOD_PROPERTY,
487:                            WARNING_SUPPRESSION_INTERVAL).intValue();
488:                    myWarningCutoffTime = System.currentTimeMillis()
489:                            + WARNING_SUPPRESSION_INTERVAL * 60000;
490:                }
491:
492:                return myWarningCutoffTime;
493:            }
494:
495:            protected void resetWarningCutoffTime() {
496:                myWarningCutoffTime = -1;
497:            }
498:
499:            protected Lineage getLineage(int lineageType, TimeSpan timeSpan) {
500:                if (myLoggingService.isDebugEnabled()) {
501:                    myLoggingService.debug(myAgentName
502:                            + ": getLineage() requested lineage of type "
503:                            + Lineage.typeToRole(lineageType)
504:                            + new Date(timeSpan.getStartTime()) + " - "
505:                            + new Date(timeSpan.getEndTime()));
506:                }
507:
508:                for (Iterator iterator = myLineageSubscription.iterator(); iterator
509:                        .hasNext();) {
510:                    Lineage lineage = (Lineage) iterator.next();
511:                    if (lineage.getType() == lineageType) {
512:                        Collection lineageTimeSpans = lineage.getSchedule()
513:                                .getOverlappingScheduleElements(
514:                                        timeSpan.getStartTime(),
515:                                        timeSpan.getEndTime());
516:
517:                        if (!lineageTimeSpans.isEmpty()) {
518:                            TimeSpan lineageTimeSpan = (TimeSpan) lineageTimeSpans
519:                                    .iterator().next();
520:                            if ((lineageTimeSpan.getStartTime() <= timeSpan
521:                                    .getStartTime())
522:                                    && (lineageTimeSpan.getEndTime() >= timeSpan
523:                                            .getEndTime())) {
524:
525:                                if (myLoggingService.isDebugEnabled()) {
526:                                    myLoggingService.debug(myAgentName
527:                                            + ": getLineage() returning "
528:                                            + lineage);
529:                                }
530:                                return lineage;
531:                            } else {
532:                                myLoggingService.warn(myAgentName
533:                                        + ": getLineage() requested timeSpan "
534:                                        + new Date(timeSpan.getStartTime())
535:                                        + " - "
536:                                        + new Date(timeSpan.getEndTime())
537:                                        + " spans more than one lineage.");
538:                                return null;
539:                            }
540:                        }
541:                    }
542:                }
543:
544:                myLoggingService.error(myAgentName
545:                        + ": getLineage() requested lineage of type "
546:                        + Lineage.typeToRole(lineageType)
547:                        + new Date(timeSpan.getStartTime()) + " - "
548:                        + new Date(timeSpan.getEndTime())
549:                        + " does not match any lineage.\n"
550:                        + myLineageSubscription);
551:                return null;
552:            }
553:
554:            private class RQ {
555:                MMQueryRequest queryRequest;
556:                MMRoleQuery query;
557:                RegistryQuery rq;
558:                Lineage ypLineage;
559:
560:                Collection services;
561:                Exception exception;
562:                boolean complete = false;
563:                Object previousYPContext = null;
564:                Object currentYPContext = null;
565:                boolean getNextContextFailed = false;
566:
567:                RQ(MMQueryRequest queryRequest, MMRoleQuery query,
568:                        RegistryQuery rq) {
569:                    this .queryRequest = queryRequest;
570:                    this .query = query;
571:                    this .rq = rq;
572:                    this .ypLineage = null;
573:                }
574:
575:                // Verbose toString used when printing myOutstandingRQs in handleQuiescenceReport
576:                public String toString() {
577:                    return "RQ for query <" + queryRequest.getUID()
578:                            + ": ResultCode " + queryRequest.getResultCode()
579:                            + ", query: " + query + "> using lineage "
580:                            + ypLineage + ". Has exception? "
581:                            + (exception == null ? "No" : "Yes")
582:                            + ". Complete? " + complete
583:                            + " getNextContextFailed? " + getNextContextFailed;
584:                }
585:            }
586:
587:            // issue a async request
588:            private void postRQ(final RQ r) {
589:                // Don't bother to process obsolete queries
590:                if (r.query.getObsolete()) {
591:                    if (myLoggingService.isDebugEnabled()) {
592:                        myLoggingService
593:                                .debug("postRQ: ignoring obsolete MMQueryRequest - "
594:                                        + r.queryRequest);
595:                    }
596:                    // Update quiescence
597:                    handleQuiescenceReport();
598:                    return;
599:                }
600:
601:                if (myLoggingService.isDebugEnabled()) {
602:                    myLoggingService.debug(getAgentIdentifier() + ": posting "
603:                            + r + " (" + r.rq + ")");
604:                }
605:
606:                synchronized (myOutstandingRQs) {
607:                    myOutstandingRQs.add(r);
608:                }
609:
610:                if (myDistributedYPServers) {
611:                    findServiceWithDistributedYP(r);
612:                } else {
613:                    findServiceWithCentralizedYP(r);
614:                }
615:            }
616:
617:            // note an async response and wake the plugin
618:            private void pendRQ(RQ r) {
619:                if (myLoggingService.isDebugEnabled()) {
620:                    myLoggingService.debug(getAgentIdentifier() + " pending "
621:                            + r + " (" + r.rq + ")");
622:                }
623:                r.complete = true;
624:                synchronized (myOutstandingRQs) {
625:                    myOutstandingRQs.remove(r);
626:                }
627:                synchronized (myPendingRQs) {
628:                    myPendingRQs.add(r);
629:                }
630:                wake(); // tell the plugin to wake up
631:            }
632:
633:            // get a pending RQ (or null) so that we can deal with it
634:            private RQ getPendingRQ() {
635:                RQ r = null;
636:                synchronized (myPendingRQs) {
637:                    if (!myPendingRQs.isEmpty()) {
638:                        r = (RQ) myPendingRQs.remove(0); // treat like a fifo
639:                        if (myLoggingService.isDebugEnabled()) {
640:                            myLoggingService.debug(getAgentIdentifier()
641:                                    + " retrieving " + r + " (" + r.rq + ")");
642:                        }
643:                    }
644:                }
645:                return r;
646:            }
647:
648:            private boolean useYPCommunitySearchPath(final RQ r) {
649:                Lineage opconLineage = getLineage(Lineage.OPCON, r.query
650:                        .getTimeSpan());
651:                Lineage adconLineage = getLineage(Lineage.ADCON, r.query
652:                        .getTimeSpan());
653:
654:                if (myLoggingService.isDebugEnabled()) {
655:                    myLoggingService.debug(getAgentIdentifier()
656:                            + ": useYPCommunitySearchPath() " + " timeSpan = ("
657:                            + new Date(r.query.getTimeSpan().getStartTime())
658:                            + ", "
659:                            + new Date(r.query.getTimeSpan().getEndTime())
660:                            + ")  adconLineage = " + adconLineage
661:                            + " opconLineage = " + opconLineage);
662:                    if ((opconLineage != null) && (adconLineage != null)) {
663:                        myLoggingService
664:                                .debug(getAgentIdentifier()
665:                                        + ": useYPCommunitySearchPath()() "
666:                                        + " adconLineage.getList() = "
667:                                        + adconLineage.getList()
668:                                        + " opconLineage.getList() = "
669:                                        + opconLineage.getList()
670:                                        + " opconLineage.getList().equals(adconLineage.getList()) == "
671:                                        + opconLineage.getList().equals(
672:                                                adconLineage.getList()));
673:                    }
674:                }
675:
676:                if ((adconLineage == null) || (opconLineage == null)) {
677:                    if (myLoggingService.isDebugEnabled()) {
678:                        myLoggingService
679:                                .debug(getAgentIdentifier()
680:                                        + ": useYPCommunitySearchPath() "
681:                                        + " returning false - "
682:                                        + "no Administrative or Operational lineage."
683:                                        + " ADCON lineage = "
684:                                        + adconLineage
685:                                        + " OPCON lineage = "
686:                                        + opconLineage
687:                                        + " for "
688:                                        + new Date(r.query.getTimeSpan()
689:                                                .getStartTime())
690:                                        + " to "
691:                                        + new Date(r.query.getTimeSpan()
692:                                                .getEndTime()));
693:                    }
694:
695:                    return false;
696:                } else {
697:                    return (opconLineage.getList().equals(adconLineage
698:                            .getList()));
699:                }
700:            }
701:
702:            private void findServiceWithCentralizedYP(final RQ r) {
703:                if (myLoggingService.isDebugEnabled()) {
704:                    myLoggingService.debug(getAgentIdentifier()
705:                            + " findServiceWithCentralizedYP: r = " + r);
706:                }
707:
708:                myRegistryQueryService.findServiceAndBinding(r.rq,
709:                        new RegistryQueryService.Callback() {
710:                            public void invoke(Object result) {
711:                                r.services = (Collection) result;
712:                                if (myLoggingService.isDebugEnabled()) {
713:                                    myLoggingService.debug(getAgentIdentifier()
714:                                            + " results = " + result + " for "
715:                                            + r.currentYPContext);
716:                                }
717:                                flush();
718:                            }
719:
720:                            public void handle(Exception e) {
721:                                r.exception = e;
722:                                if (myLoggingService.isDebugEnabled()) {
723:                                    myLoggingService.debug(getAgentIdentifier()
724:                                            + " failed during query of "
725:                                            + r.queryRequest, e);
726:                                }
727:                                flush();
728:                            }
729:
730:                            private void flush() {
731:                                pendRQ(r);
732:                            }
733:                        });
734:            }
735:
736:            private void findServiceWithDistributedYP(final RQ r) {
737:                if (useYPCommunitySearchPath(r)) {
738:                    if (myLoggingService.isDebugEnabled()) {
739:                        myLoggingService.debug(getAgentIdentifier()
740:                                + " findServiceWithDistributedYP: "
741:                                + " using YPCommunity search.");
742:                    }
743:
744:                    r.ypLineage = null;
745:                    myRegistryQueryService.findServiceAndBinding(
746:                            r.currentYPContext, r.rq,
747:                            new RegistryQueryService.CallbackWithContext() {
748:                                public void invoke(Object result) {
749:                                    r.services = (Collection) result;
750:                                    if (myLoggingService.isDebugEnabled()) {
751:                                        myLoggingService
752:                                                .debug(getAgentIdentifier()
753:                                                        + " results = "
754:                                                        + result + " for "
755:                                                        + r.currentYPContext);
756:                                    }
757:                                    flush();
758:                                }
759:
760:                                public void handle(Exception e) {
761:                                    r.exception = e;
762:                                    if (myLoggingService.isDebugEnabled()) {
763:                                        myLoggingService
764:                                                .debug(
765:                                                        getAgentIdentifier()
766:                                                                + " failed during query of "
767:                                                                + r.queryRequest
768:                                                                + " context =  "
769:                                                                + r.currentYPContext,
770:                                                        e);
771:                                    }
772:                                    flush();
773:                                }
774:
775:                                public void setNextContext(Object context) {
776:                                    if (myLoggingService.isDebugEnabled()) {
777:                                        myLoggingService
778:                                                .debug(getAgentIdentifier()
779:                                                        + " previous YPContext "
780:                                                        + r.currentYPContext
781:                                                        + " current YPContext "
782:                                                        + context);
783:                                    }
784:                                    r.previousYPContext = r.currentYPContext;
785:                                    r.currentYPContext = context;
786:
787:                                    if (context == null) {
788:                                        r.getNextContextFailed = true;
789:                                    }
790:                                }
791:
792:                                private void flush() {
793:                                    pendRQ(r);
794:                                }
795:                            });
796:                } else {
797:                    r.ypLineage = getLineage(Lineage.OPCON, r.query
798:                            .getTimeSpan());
799:
800:                    if (r.ypLineage == null) {
801:                        String errorMessage = getAgentIdentifier()
802:                                + " no Operation lineage for "
803:                                + new Date(r.query.getTimeSpan().getStartTime())
804:                                + " to "
805:                                + new Date(r.query.getTimeSpan().getEndTime());
806:
807:                        // AMH: Don't retry here. Instead, let the while in execute()
808:                        // call handleException to do this.
809:                        r.exception = new IllegalStateException(errorMessage);
810:                        if (myLoggingService.isDebugEnabled())
811:                            myLoggingService
812:                                    .debug("findServiceWithDistributedYP: "
813:                                            + " no Operation lineage, doing pendRQ with "
814:                                            + " an IllegalStateException for "
815:                                            + r);
816:                        pendRQ(r);
817:                        return;
818:                    }
819:
820:                    if (myLoggingService.isDebugEnabled()) {
821:                        myLoggingService.debug(getAgentIdentifier()
822:                                + " findServiceWithDistributedYP: "
823:                                + " using lineage based search - "
824:                                + " r.ypLineage = " + r.ypLineage);
825:                    }
826:
827:                    List lineageList = r.ypLineage.getList();
828:                    int listSize = lineageList.size();
829:
830:                    if (r.currentYPContext == null) {
831:                        r.currentYPContext = (listSize > 1) ? (String) lineageList
832:                                .get(listSize - 2)
833:                                : (String) r.ypLineage.getLeaf();
834:                    } else {
835:                        int index = lineageList.indexOf(r.currentYPContext);
836:                        if (index > 0) {
837:                            r.previousYPContext = r.currentYPContext;
838:                            r.currentYPContext = (String) lineageList
839:                                    .get(index - 1);
840:                        } else {
841:                            // Reached the top of the lineage. Restart the search.
842:                            r.currentYPContext = (listSize > 1) ? (String) lineageList
843:                                    .get(listSize - 2)
844:                                    : (String) r.ypLineage.getLeaf();
845:                        }
846:                    }
847:
848:                    myRegistryQueryService.findServiceAndBinding(
849:                            (String) r.currentYPContext, r.rq,
850:                            new RegistryQueryService.CallbackWithContext() {
851:                                public void invoke(Object result) {
852:                                    r.services = (Collection) result;
853:                                    if (myLoggingService.isDebugEnabled()) {
854:                                        myLoggingService
855:                                                .debug(getAgentIdentifier()
856:                                                        + " results = "
857:                                                        + result + " for "
858:                                                        + r.currentYPContext);
859:                                    }
860:                                    flush();
861:                                }
862:
863:                                public void handle(Exception e) {
864:                                    r.exception = e;
865:                                    if (myLoggingService.isDebugEnabled()) {
866:                                        myLoggingService
867:                                                .debug(
868:                                                        getAgentIdentifier()
869:                                                                + " failed during query of "
870:                                                                + r.queryRequest
871:                                                                + " context =  "
872:                                                                + r.currentYPContext,
873:                                                        e);
874:                                    }
875:                                    flush();
876:                                }
877:
878:                                public void setNextContext(Object context) {
879:                                    if (myLoggingService.isDebugEnabled()) {
880:                                        myLoggingService
881:                                                .debug(getAgentIdentifier()
882:                                                        + " getNextContext() for "
883:                                                        + r.currentYPContext
884:                                                        + " returned "
885:                                                        + context);
886:                                    }
887:
888:                                    if (context == null) {
889:                                        // Restart search
890:                                        r.currentYPContext = null;
891:                                        r.getNextContextFailed = true;
892:                                    }
893:                                }
894:
895:                                private void flush() {
896:                                    pendRQ(r);
897:                                }
898:                            });
899:                }
900:            }
901:
902:            public class QueryAlarm implements  Alarm {
903:                private long expiresAt;
904:                private boolean expired = false;
905:                private RQ rq = null;
906:
907:                public QueryAlarm(RQ rq, long expirationTime) {
908:                    expiresAt = expirationTime;
909:                    this .rq = rq;
910:                }
911:
912:                public long getExpirationTime() {
913:                    return expiresAt;
914:                }
915:
916:                public synchronized void expire() {
917:                    if (!expired) {
918:                        if (myLoggingService.isDebugEnabled()) {
919:                            myLoggingService.debug("expire: alarm = " + this 
920:                                    + " for RQ = " + rq);
921:                        }
922:                        synchronized (myOutstandingAlarms) {
923:                            myOutstandingAlarms.remove(this );
924:                        }
925:                        expired = true;
926:                        rq.complete = false;
927:                        postRQ(rq);
928:                    }
929:                }
930:
931:                public boolean hasExpired() {
932:                    return expired;
933:                }
934:
935:                public synchronized boolean cancel() {
936:                    boolean was = expired;
937:                    expired = true;
938:                    synchronized (myOutstandingAlarms) {
939:                        myOutstandingAlarms.remove(this );
940:                    }
941:                    return was;
942:                }
943:
944:                public String toString() {
945:                    return "<QueryAlarm " + expiresAt
946:                            + (expired ? "(Expired) " : " ")
947:                            + rq.query.getRole() + " "
948:                            + "for MatchmakerStubPlugin at "
949:                            + getAgentIdentifier() + ">";
950:                }
951:            }
952:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.