Source Code Cross Referenced for StepRunnerPlugin.java in  » Science » Cougaar12_4 » org » cougaar » core » examples » mobility » step » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.examples.mobility.step 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /* 
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 2002-2004 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:        package org.cougaar.core.examples.mobility.step;
0027:
0028:        import java.net.URI;
0029:        import java.util.ArrayList;
0030:        import java.util.Collection;
0031:        import java.util.Enumeration;
0032:        import java.util.HashMap;
0033:        import java.util.Iterator;
0034:        import java.util.List;
0035:        import java.util.Map;
0036:        import org.cougaar.core.agent.service.alarm.Alarm;
0037:        import org.cougaar.core.blackboard.CollectionSubscription;
0038:        import org.cougaar.core.blackboard.IncrementalSubscription;
0039:        import org.cougaar.core.component.ServiceBroker;
0040:        import org.cougaar.core.examples.mobility.ldm.Step;
0041:        import org.cougaar.core.examples.mobility.ldm.StepOptions;
0042:        import org.cougaar.core.examples.mobility.ldm.StepStatus;
0043:        import org.cougaar.core.mobility.Ticket;
0044:        import org.cougaar.core.mobility.ldm.MobilityFactory;
0045:        import org.cougaar.core.mobility.ldm.MoveAgent;
0046:        import org.cougaar.core.mts.MessageAddress;
0047:        import org.cougaar.core.node.NodeIdentificationService;
0048:        import org.cougaar.core.plugin.ComponentPlugin;
0049:        import org.cougaar.core.service.AgentIdentificationService;
0050:        import org.cougaar.core.service.AlarmService;
0051:        import org.cougaar.core.service.BlackboardService;
0052:        import org.cougaar.core.service.DomainService;
0053:        import org.cougaar.core.service.LoggingService;
0054:        import org.cougaar.core.service.wp.AddressEntry;
0055:        import org.cougaar.core.service.wp.WhitePagesService;
0056:        import org.cougaar.core.util.UID;
0057:        import org.cougaar.core.util.UniqueObject;
0058:        import org.cougaar.util.UnaryPredicate;
0059:
0060:        /**
0061:         * This plugin executes Steps where the step option's 
0062:         * target is the local agent.
0063:         * <p>
0064:         * MoveAgent objects are created by this plugin.
0065:         */
0066:        public class StepRunnerPlugin extends ComponentPlugin {
0067:
0068:            private MessageAddress todd;
0069:            private MessageAddress agentId;
0070:            private MessageAddress nodeId;
0071:
0072:            private IncrementalSubscription stepSub;
0073:            private IncrementalSubscription moveSub;
0074:
0075:            private LoggingService log;
0076:            private DomainService domain;
0077:            private WhitePagesService wps;
0078:
0079:            private MobilityFactory mobilityFactory;
0080:
0081:            // Non-persisted cache of blackboard objects, for quick access. 
0082:            // On rehydration it is fully reconstructed from the blackboard.
0083:            private Map idToEntry = new HashMap(13);
0084:
0085:            // pending alarms
0086:            //
0087:            // FIXME could optimize to use treeset, keep fewer alarms, etc
0088:            // for now this is workable
0089:            private List pendingAlarms = new ArrayList(13);
0090:
0091:            public void load() {
0092:                super .load();
0093:
0094:                // get the logger
0095:                log = (LoggingService) getServiceBroker().getService(this ,
0096:                        LoggingService.class, null);
0097:                if (log == null) {
0098:                    log = LoggingService.NULL;
0099:                }
0100:
0101:                // get the agentId
0102:                AgentIdentificationService agentIdService = (AgentIdentificationService) getServiceBroker()
0103:                        .getService(this , AgentIdentificationService.class,
0104:                                null);
0105:                if (agentIdService == null) {
0106:                    throw new RuntimeException(
0107:                            "Unable to obtain agent-id service");
0108:                }
0109:                this .agentId = agentIdService.getMessageAddress();
0110:                getServiceBroker().releaseService(this ,
0111:                        AgentIdentificationService.class, agentIdService);
0112:                if (agentId == null) {
0113:                    throw new RuntimeException("Unable to obtain agent id");
0114:                }
0115:                todd = agentId;
0116:
0117:                // get the nodeId
0118:                NodeIdentificationService nodeIdService = (NodeIdentificationService) getServiceBroker()
0119:                        .getService(this , NodeIdentificationService.class, null);
0120:                if (nodeIdService == null) {
0121:                    throw new RuntimeException(
0122:                            "Unable to obtain node-id service");
0123:                }
0124:                this .nodeId = nodeIdService.getMessageAddress();
0125:                getServiceBroker().releaseService(this ,
0126:                        NodeIdentificationService.class, nodeIdService);
0127:                if (nodeId == null) {
0128:                    throw new RuntimeException("Unable to obtain node id");
0129:                }
0130:
0131:                // get the mobility domain and factory
0132:                this .domain = (DomainService) getServiceBroker().getService(
0133:                        this , DomainService.class, null);
0134:                if (domain == null) {
0135:                    throw new RuntimeException(
0136:                            "Unable to obtain domain service");
0137:                }
0138:                this .mobilityFactory = (MobilityFactory) domain
0139:                        .getFactory("mobility");
0140:                if (mobilityFactory == null) {
0141:                    throw new RuntimeException(
0142:                            "Mobility factory (and domain) not enabled");
0143:                }
0144:
0145:                // get the white pages service
0146:                this .wps = (WhitePagesService) getServiceBroker().getService(
0147:                        this , WhitePagesService.class, null);
0148:                if (wps == null) {
0149:                    throw new RuntimeException(
0150:                            "Unable to obtain white pages service");
0151:                }
0152:
0153:                if (log.isDebugEnabled()) {
0154:                    log.debug(todd + "Loaded");
0155:                }
0156:            }
0157:
0158:            public void unload() {
0159:                if (wps != null) {
0160:                    getServiceBroker().releaseService(this ,
0161:                            WhitePagesService.class, wps);
0162:                    wps = null;
0163:                }
0164:                if (domain != null) {
0165:                    getServiceBroker().releaseService(this ,
0166:                            DomainService.class, domain);
0167:                    domain = null;
0168:                }
0169:                if ((log != null) && (log != LoggingService.NULL)) {
0170:                    getServiceBroker().releaseService(this ,
0171:                            LoggingService.class, log);
0172:                    log = LoggingService.NULL;
0173:                }
0174:                super .unload();
0175:            }
0176:
0177:            protected void setupSubscriptions() {
0178:                // subscribe to steps with a target matching this agent
0179:                UnaryPredicate stepPred = createStepPredicate(agentId);
0180:                stepSub = (IncrementalSubscription) blackboard
0181:                        .subscribe(stepPred);
0182:
0183:                // subscribe to our own move requests
0184:                UnaryPredicate movePred = createMovePredicate(agentId);
0185:                moveSub = (IncrementalSubscription) blackboard
0186:                        .subscribe(movePred);
0187:
0188:                if (blackboard.didRehydrate()) {
0189:                    // recreate cache from blackboard
0190:                    recreateEntries();
0191:                } else {
0192:                    // create new cache
0193:                }
0194:            }
0195:
0196:            protected void execute() {
0197:                if (log.isDebugEnabled()) {
0198:                    log.debug(todd + "Execute");
0199:                }
0200:
0201:                // watch steps
0202:                if (stepSub.hasChanged()) {
0203:                    //   added steps
0204:                    Enumeration en = stepSub.getAddedList();
0205:                    while (en.hasMoreElements()) {
0206:                        Step step = (Step) en.nextElement();
0207:                        StepStatus status = step.getStatus();
0208:                        // validate step status
0209:                        if (status.getState() != StepStatus.UNSEEN) {
0210:                            if (log.isErrorEnabled()) {
0211:                                log.error(todd + "Newly added step "
0212:                                        + step.getUID()
0213:                                        + " with prior status: " + status);
0214:                            }
0215:                        }
0216:                        addStep(step);
0217:                    }
0218:                    //   ignore changes: only this plugin can modify steps
0219:                    //   removed steps
0220:                    en = stepSub.getRemovedList();
0221:                    while (en.hasMoreElements()) {
0222:                        Step s = (Step) en.nextElement();
0223:                        removeStep(s);
0224:                    }
0225:                }
0226:
0227:                // watch move-reqs
0228:                if (moveSub.hasChanged()) {
0229:                    // ignore adds: this plugin created them
0230:                    // changes fill in step details
0231:                    Enumeration en = moveSub.getChangedList();
0232:                    while (en.hasMoreElements()) {
0233:                        MoveAgent ma = (MoveAgent) en.nextElement();
0234:                        updateMove(ma);
0235:                    }
0236:                    // ignore removes: this plugin did it
0237:                }
0238:
0239:                // check alarms
0240:                List l = getDueTicketIds();
0241:                if (l != null) {
0242:                    handleDueTicketIds(l);
0243:                }
0244:            }
0245:
0246:            private void recreateEntries() {
0247:                // these should be empty
0248:                idToEntry.clear();
0249:                cancelAlarms();
0250:
0251:                // recreate from blackboard contents
0252:                Collection c = stepSub.getCollection();
0253:                if (!(c.isEmpty())) {
0254:                    int n = c.size();
0255:                    Iterator iter = c.iterator();
0256:                    for (int i = 0; i < n; i++) {
0257:                        Step step = (Step) iter.next();
0258:                        switch (step.getStatus().getState()) {
0259:                        case StepStatus.UNSEEN:
0260:                        case StepStatus.PAUSED:
0261:                        case StepStatus.RUNNING:
0262:                            addStep(step);
0263:                            break;
0264:                        case StepStatus.SUCCESS:
0265:                        case StepStatus.FAILURE:
0266:                        case StepStatus.TIMEOUT: {
0267:                            // completed, but we still need it in the
0268:                            // table for later remote removal
0269:                            StepOptions options = step.getOptions();
0270:                            Ticket ticket = options.getTicket();
0271:                            Object id = ticket.getIdentifier();
0272:                            MoveAgent ma = findMove(id);
0273:                            Entry entry = new Entry(ticket, null, step);
0274:                            entry.moveAgent = ma;
0275:                            idToEntry.put(id, entry);
0276:                            if (log.isDebugEnabled()) {
0277:                                log.debug("Re-added entry "
0278:                                        + id
0279:                                        + " with move "
0280:                                        + ((ma != null) ? ma.getUID()
0281:                                                .toString() : "null"));
0282:                            }
0283:                        }
0284:                            break;
0285:                        }
0286:                    }
0287:                }
0288:
0289:                // remove unreferenced MoveAgent entries?
0290:            }
0291:
0292:            private void addStep(Step step) {
0293:
0294:                // step status is typically UNSEEN
0295:                // on restart it can be PAUSED or RUNNING
0296:
0297:                StepOptions options = step.getOptions();
0298:                Ticket ticket = options.getTicket();
0299:                Object id = ticket.getIdentifier();
0300:
0301:                StepStatus status = step.getStatus();
0302:
0303:                // check for immediate timeout
0304:                long nowTime = System.currentTimeMillis();
0305:                long pauseTime = options.getPauseTime();
0306:                long timeoutTime = options.getTimeoutTime();
0307:                if (((timeoutTime > 0) && (timeoutTime <= nowTime))
0308:                        || ((pauseTime > 0) && (pauseTime <= nowTime) && (status
0309:                                .getState() != StepStatus.RUNNING))) {
0310:
0311:                    if (status.getState() == StepStatus.RUNNING) {
0312:                        // restart from RUNNING
0313:                        // alarms are not persisted
0314:                        // remove possible MoveAgent object
0315:                        MoveAgent ma = findMove(id);
0316:                        if (ma != null) {
0317:                            // timeout, don't care if it succeeded!
0318:                            blackboard.publishRemove(ma);
0319:                        }
0320:                    }
0321:
0322:                    status = new StepStatus(StepStatus.TIMEOUT, ((status
0323:                            .getState() == StepStatus.RUNNING) ? status
0324:                            .getStartTime() : nowTime), nowTime, null);
0325:                    step.setStatus(status);
0326:                    blackboard.publishChange(step);
0327:
0328:                    if (log.isDebugEnabled()) {
0329:                        log.debug(todd + "Step " + step.getUID()
0330:                                + " added, but already timed out: " + " now ("
0331:                                + nowTime + "), " + " pause (" + pauseTime
0332:                                + "), " + " timeout (" + timeoutTime + ")");
0333:                    }
0334:                    return;
0335:                }
0336:
0337:                MessageAddress mobileAgent = ticket.getMobileAgent();
0338:                if (mobileAgent == null) {
0339:                    mobileAgent = agentId;
0340:                }
0341:
0342:                // check white pages.  Okay if agent is not listed.
0343:                WPInfo origWPI = lookupInWP(mobileAgent, 30000); // max wait
0344:                if (log.isDebugEnabled()) {
0345:                    log.debug(todd + "Step " + step.getUID()
0346:                            + " added, wp entry for agent " + mobileAgent
0347:                            + " is " + origWPI);
0348:                }
0349:
0350:                // create new Entry for this ticket
0351:                // add step to table
0352:                Entry entry = new Entry(ticket, origWPI, step);
0353:                idToEntry.put(id, entry);
0354:
0355:                if ((pauseTime > 0)
0356:                        && (status.getState() != StepStatus.RUNNING)) {
0357:
0358:                    // set step status to PAUSED
0359:                    if (status.getState() != StepStatus.PAUSED) {
0360:                        status = new StepStatus(StepStatus.PAUSED, -1, -1, null);
0361:                        step.setStatus(status);
0362:                        blackboard.publishChange(step);
0363:                    }
0364:
0365:                    // create wakeup alarm
0366:                    MyAlarm alarm = addAlarm(pauseTime, id);
0367:                    entry.alarm = alarm;
0368:
0369:                    if (log.isDebugEnabled()) {
0370:                        log.debug(todd + "Step " + step.getUID()
0371:                                + " pausing for " + (pauseTime - nowTime)
0372:                                + " millis (" + pauseTime + " - " + nowTime
0373:                                + ")");
0374:                    }
0375:                    return;
0376:                }
0377:
0378:                startEntry(entry);
0379:            }
0380:
0381:            private void startEntry(Entry entry) {
0382:                // step already listed in table, and pause-time has passed
0383:                Step step = entry.step;
0384:
0385:                StepOptions options = step.getOptions();
0386:                StepStatus status = step.getStatus();
0387:
0388:                Ticket ticket = options.getTicket();
0389:                Object id = ticket.getIdentifier();
0390:
0391:                if (log.isDebugEnabled()) {
0392:                    long nowTime = System.currentTimeMillis();
0393:                    log.debug(todd + "Start step " + step.getUID());
0394:                }
0395:
0396:                // don't check for a step in progress for this agent; it's
0397:                // something we want to test.
0398:
0399:                if (status.getState() != StepStatus.RUNNING) {
0400:                    // create new move request
0401:                    MoveAgent ma = mobilityFactory.createMoveAgent(ticket);
0402:                    blackboard.publishAdd(ma);
0403:                    entry.moveAgent = ma;
0404:
0405:                    if (log.isDebugEnabled()) {
0406:                        log.debug(todd + "Created new MoveAgent object "
0407:                                + ma.getUID() + " for step " + step.getUID());
0408:                    }
0409:
0410:                    // transition step status from (UNSEEN|PAUSED) to RUNNING
0411:                    status = new StepStatus(StepStatus.RUNNING, System
0412:                            .currentTimeMillis(), -1, null);
0413:                    step.setStatus(status);
0414:                    blackboard.publishChange(step);
0415:
0416:                    if (log.isDebugEnabled()) {
0417:                        log.debug(todd + "Step " + step.getUID()
0418:                                + " is now RUNNING");
0419:                    }
0420:                } else {
0421:                    // find existing move request
0422:                    MoveAgent ma = findMove(id);
0423:                    if (ma == null) {
0424:                        if (log.isErrorEnabled()) {
0425:                            log
0426:                                    .error("Recreated step "
0427:                                            + step.getUID()
0428:                                            + " but unable to find matching MoveAgent object"
0429:                                            + " with ticket id " + id);
0430:                        }
0431:                    } else {
0432:                        entry.moveAgent = ma;
0433:                        if (log.isDebugEnabled()) {
0434:                            log.debug("Recreated step " + step.getUID()
0435:                                    + " found matching MoveAgent object "
0436:                                    + ma.getUID());
0437:                        }
0438:                        // need to updateMove now?
0439:                    }
0440:
0441:                    if (log.isDebugEnabled()) {
0442:                        log.debug(todd + "Step " + step.getUID()
0443:                                + " resumed RUNNING");
0444:                    }
0445:                }
0446:
0447:                // start timer
0448:                long timeoutTime = options.getTimeoutTime();
0449:                if (timeoutTime > 0) {
0450:                    MyAlarm alarm = addAlarm(timeoutTime, id);
0451:                    entry.alarm = alarm;
0452:
0453:                    if (log.isDebugEnabled()) {
0454:                        long nowTime = System.currentTimeMillis();
0455:                        log.debug(todd + "Started step " + step.getUID()
0456:                                + " with timeout in " + (timeoutTime - nowTime)
0457:                                + " millis (" + timeoutTime + " - " + nowTime
0458:                                + ")");
0459:                    }
0460:                } else {
0461:                    if (log.isDebugEnabled()) {
0462:                        log
0463:                                .debug(todd + "No timeout for step "
0464:                                        + step.getUID());
0465:                    }
0466:                }
0467:            }
0468:
0469:            private void timeoutEntry(Entry entry) {
0470:                // entry has been removed from table
0471:
0472:                Step step = entry.step;
0473:                StepStatus status = step.getStatus();
0474:
0475:                // transition step status from RUNNING to TIMEOUT
0476:                long nowTime = System.currentTimeMillis();
0477:                status = new StepStatus(StepStatus.TIMEOUT, status
0478:                        .getStartTime(), nowTime, null);
0479:                step.setStatus(status);
0480:                blackboard.publishChange(step);
0481:
0482:                // alarm should be expired
0483:                MyAlarm alarm = entry.alarm;
0484:                if (alarm == null) {
0485:                    if (step.getOptions().getTimeoutTime() > 0) {
0486:                        if (log.isErrorEnabled()) {
0487:                            log.error(todd + "Timeout with no alarm: " + step);
0488:                        }
0489:                    }
0490:                } else if (!(alarm.hasExpired())) {
0491:                    if (log.isErrorEnabled()) {
0492:                        log.error(todd + "Timeout with non-expired alarm: "
0493:                                + alarm);
0494:                    }
0495:                    alarm.cancel();
0496:                }
0497:
0498:                // remove move-agent object
0499:                MoveAgent ma = entry.moveAgent;
0500:                if (ma != null) {
0501:                    if (ma.getStatus() != null) {
0502:                        if (log.isErrorEnabled()) {
0503:                            log.error(todd + "Timeout with move-agent status: "
0504:                                    + ma);
0505:                        }
0506:                    }
0507:                    if (log.isInfoEnabled()) {
0508:                        log
0509:                                .info(todd
0510:                                        + "Removing MoveAgent object "
0511:                                        + ma.getUID()
0512:                                        + " after move timeout.  If the move later completes, the"
0513:                                        + " MoveAgentPlugin may complain.");
0514:                    }
0515:                    blackboard.publishRemove(ma);
0516:                }
0517:
0518:                if (log.isDebugEnabled()) {
0519:                    log.debug(todd + "Timed out on step " + step.getUID()
0520:                            + " at time " + nowTime);
0521:                }
0522:            }
0523:
0524:            private void handleDueTicketIds(List ids) {
0525:                // lookup entry in table
0526:                for (int i = 0, n = ids.size(); i < n; i++) {
0527:                    Object id = ids.get(i);
0528:                    Entry entry = (Entry) idToEntry.get(id);
0529:                    if (entry == null) {
0530:                        // removed, or already finished
0531:                        if (log.isDebugEnabled()) {
0532:                            log.debug(todd
0533:                                    + "Ignoring alarm for unknown ticket id "
0534:                                    + id);
0535:                        }
0536:                        continue;
0537:                    }
0538:                    Step step = entry.step;
0539:                    int state = step.getStatus().getState();
0540:                    if (state == StepStatus.PAUSED) {
0541:                        // verify that pause time has passed
0542:                        long nowTime = System.currentTimeMillis();
0543:                        long pauseTime = step.getOptions().getPauseTime();
0544:                        if (nowTime < pauseTime) {
0545:                            // interrupted?
0546:                            if (log.isWarnEnabled()) {
0547:                                log.warn(todd + "Alarm pause resume at ("
0548:                                        + nowTime + ")"
0549:                                        + " is less than pause time ("
0550:                                        + pauseTime + "),"
0551:                                        + " will proceed anyways");
0552:                            }
0553:                            // proceed anyways
0554:                        }
0555:                        // start the entry
0556:                        startEntry(entry);
0557:                    } else if (state == StepStatus.RUNNING) {
0558:                        // timeout now
0559:                        idToEntry.remove(id);
0560:                        timeoutEntry(entry);
0561:                    } else {
0562:                        // ignore, already completed
0563:                    }
0564:                }
0565:            }
0566:
0567:            private void removeStep(Step step) {
0568:                Ticket ticket = step.getOptions().getTicket();
0569:                Object id = ticket.getIdentifier();
0570:                // lookup entry in table
0571:                Entry entry = (Entry) idToEntry.remove(id);
0572:                if (entry == null) {
0573:                    // already completed, removed, or not known
0574:                    if (log.isDebugEnabled()) {
0575:                        log.debug(todd + "Remove step " + step.getUID()
0576:                                + " didn't find the step in the table");
0577:                    }
0578:                    return;
0579:                }
0580:
0581:                // step no longer in blackboard
0582:
0583:                // alarm may be running
0584:                MyAlarm alarm = entry.alarm;
0585:                if ((alarm != null) && (!(alarm.hasExpired()))) {
0586:                    alarm.cancel();
0587:                }
0588:
0589:                // remove move-agent object
0590:                MoveAgent ma = entry.moveAgent;
0591:                if (ma != null) {
0592:                    blackboard.publishRemove(ma);
0593:                }
0594:
0595:                if (log.isDebugEnabled()) {
0596:                    log.debug(todd + "Removed step " + step.getUID());
0597:                }
0598:            }
0599:
0600:            private void updateMove(MoveAgent ma) {
0601:                // get entry
0602:                Ticket ticket = ma.getTicket();
0603:                Object id = ticket.getIdentifier();
0604:                Entry entry = (Entry) idToEntry.get(id);
0605:                if (entry == null) {
0606:                    if (log.isErrorEnabled()) {
0607:                        log.error(todd
0608:                                + "Move updated, but step no longer exists: "
0609:                                + ma);
0610:                    }
0611:                    // maybe not our move!
0612:                    //blackboard.publishRemove(ma);
0613:                    return;
0614:                }
0615:                // check entry
0616:                if (ma != entry.moveAgent) {
0617:                    if (log.isErrorEnabled()) {
0618:                        log.error(todd
0619:                                + "Move updated, but given move-agent object "
0620:                                + ma
0621:                                + " doesn't == the entry move-agent object "
0622:                                + entry.moveAgent);
0623:                    }
0624:                    return;
0625:                }
0626:
0627:                Step step = entry.step;
0628:                StepStatus status = step.getStatus();
0629:                int state = status.getState();
0630:                if (state != StepStatus.RUNNING) {
0631:                    if (log.isErrorEnabled()) {
0632:                        log.error(todd + "Move updated, but step status is ("
0633:                                + state + ") instead of RUNNING");
0634:                    }
0635:                    return;
0636:                }
0637:                // check status
0638:                MoveAgent.Status mstat = ma.getStatus();
0639:                if (mstat == null) {
0640:                    // still in progress?
0641:                    if (log.isDebugEnabled()) {
0642:                        log.debug(todd + "Step " + step.getUID()
0643:                                + " ignoring null move-agent " + ma.getUID()
0644:                                + " status");
0645:                    }
0646:                    return;
0647:                }
0648:
0649:                int newState = StepStatus.FAILURE;
0650:                if (mstat.getCode() != MoveAgent.Status.OKAY) {
0651:                    // move itself failed
0652:                    if (log.isErrorEnabled()) {
0653:                        log.error(todd + "Step " + step.getUID()
0654:                                + " finished with non-OKAY status "
0655:                                + mstat.getCodeAsString());
0656:                    }
0657:                } else {
0658:                    // check wp.  The "entry.wpi" may be null, but
0659:                    // the current wpi should not be null.
0660:                    MessageAddress mobileAgent = ticket.getMobileAgent();
0661:                    if (mobileAgent == null) {
0662:                        mobileAgent = agentId;
0663:                    }
0664:                    MessageAddress destNode = ticket.getDestinationNode();
0665:                    WPInfo origWPI = entry.wpi;
0666:
0667:                    // the wp update may take several seconds, so we
0668:                    // do a limited backoff-n-retry in case the entry looks stale.
0669:                    //
0670:                    // FIXME: ideally we would release the plugin execute thread
0671:                    // and set an alarm, but for now we'll simply sleep.
0672:                    WPInfo newWPI;
0673:                    for (int i = 0, maxi = 5;; i++) {
0674:                        newWPI = lookupInWP(mobileAgent, 30000);
0675:                        if (newWPI == null) {
0676:                            // not listed in wp?
0677:                            if (i == 0) {
0678:                                if (log.isInfoEnabled()) {
0679:                                    log.info("Will re-examine " + mobileAgent
0680:                                            + "'s null wp entry (step: "
0681:                                            + step.getUID() + ")");
0682:                                }
0683:                            } else if (i >= maxi) {
0684:                                if (log.isErrorEnabled()) {
0685:                                    log
0686:                                            .error("Step " + step.getUID()
0687:                                                    + " failed due to null wp"
0688:                                                    + " entry for agent "
0689:                                                    + mobileAgent);
0690:                                }
0691:                                break;
0692:                            }
0693:                        } else if ((destNode != null)
0694:                                && (!(destNode.getAddress().equals(newWPI.node)))) {
0695:                            // at the wrong node!
0696:                            if (i == 0) {
0697:                                if (log.isInfoEnabled()) {
0698:                                    log
0699:                                            .info("Will re-examine "
0700:                                                    + mobileAgent
0701:                                                    + "'s wp entry, which indicates that"
0702:                                                    + " the agent is at node "
0703:                                                    + newWPI.node
0704:                                                    + " instead of the move destination node "
0705:                                                    + destNode + " (step: "
0706:                                                    + step.getUID() + ")");
0707:                                }
0708:                            } else if (i >= maxi) {
0709:                                if (log.isErrorEnabled()) {
0710:                                    log
0711:                                            .error("Step "
0712:                                                    + step.getUID()
0713:                                                    + " failed due to wp listing of agent "
0714:                                                    + mobileAgent
0715:                                                    + " at node "
0716:                                                    + newWPI.node
0717:                                                    + " instead of ticket's destination node "
0718:                                                    + destNode
0719:                                                    + ", wp entry is " + newWPI);
0720:                                }
0721:                                newWPI = null;
0722:                                break;
0723:                            }
0724:                        } else {
0725:                            // seems okay
0726:                            break;
0727:                        }
0728:
0729:                        // FIXME release thread & use alarm!!!
0730:                        try {
0731:                            Thread.sleep(i * 2000);
0732:                        } catch (Exception e) {
0733:                            log.error("Cancel sleep", e);
0734:                        }
0735:                    }
0736:
0737:                    if (newWPI == null) {
0738:                        // already logged our error above
0739:                    } else if ((destNode == null)
0740:                            && (!(origWPI.node.equals(newWPI.node)))) {
0741:                        // didn't restart in place!
0742:                        if (log.isErrorEnabled()) {
0743:                            log.error("Step " + step.getUID()
0744:                                    + " finished restart-in-place of agent "
0745:                                    + mobileAgent + " started at node "
0746:                                    + origWPI.node
0747:                                    + " and ended up at a different node "
0748:                                    + newWPI.node + ", wp entry is " + newWPI);
0749:                        }
0750:                    } else if (newWPI.inc != origWPI.inc) {
0751:                        // incarnation number is wrong!  maybe crashed during move.
0752:                        if (log.isErrorEnabled()) {
0753:                            log
0754:                                    .error("Step "
0755:                                            + step.getUID()
0756:                                            + " finished with incarnation number "
0757:                                            + newWPI.inc
0758:                                            + " that doesn't match the prior incarnation number "
0759:                                            + origWPI.inc);
0760:                        }
0761:                    } else {
0762:                        // success!
0763:                        newState = StepStatus.SUCCESS;
0764:                    }
0765:                }
0766:
0767:                // update step
0768:                status = new StepStatus(newState, status.getStartTime(), System
0769:                        .currentTimeMillis(), mstat);
0770:                step.setStatus(status);
0771:                blackboard.publishChange(step);
0772:
0773:                if (log.isDebugEnabled()) {
0774:                    log.debug(todd + "Step " + step.getUID()
0775:                            + " is now in status " + status.getStateAsString());
0776:                }
0777:
0778:                // cancel alarm
0779:                MyAlarm alarm = entry.alarm;
0780:                if (alarm != null) {
0781:                    entry.alarm = null;
0782:                    if (!(alarm.hasExpired())) {
0783:                        alarm.cancel();
0784:                        if (log.isDebugEnabled()) {
0785:                            log.debug(todd + "Cancelled alarm for step "
0786:                                    + step.getUID());
0787:                        }
0788:                    }
0789:                }
0790:
0791:                // keep entry for later removal
0792:
0793:                if (log.isDebugEnabled()) {
0794:                    log.debug(todd + "Step " + step.getUID()
0795:                            + " completed move with status " + mstat);
0796:                }
0797:            }
0798:
0799:            private MyAlarm addAlarm(long time, Object id) {
0800:                MyAlarm alarm = new MyAlarm(time, id);
0801:                getAlarmService().addRealTimeAlarm(alarm);
0802:                pendingAlarms.add(alarm);
0803:                return alarm;
0804:            }
0805:
0806:            private void cancelAlarms() {
0807:                // FIXME optimize
0808:                for (int i = 0, n = pendingAlarms.size(); i < n; i++) {
0809:                    MyAlarm ai = (MyAlarm) pendingAlarms.get(i);
0810:                    if (!(ai.hasExpired())) {
0811:                        ai.cancel();
0812:                    }
0813:                }
0814:                pendingAlarms.clear();
0815:            }
0816:
0817:            private List getDueTicketIds() {
0818:                // FIXME optimize
0819:                List l = null;
0820:                for (int i = 0, n = pendingAlarms.size(); i < n; i++) {
0821:                    MyAlarm ai = (MyAlarm) pendingAlarms.get(i);
0822:                    if (ai.hasExpired()) {
0823:                        if (l == null) {
0824:                            l = new ArrayList(Math.min((n - i), 5));
0825:                        }
0826:                        l.add(ai.getId());
0827:                        pendingAlarms.remove(i);
0828:                        --i;
0829:                        --n;
0830:                    }
0831:                }
0832:                return l;
0833:            }
0834:
0835:            private WPInfo lookupInWP(MessageAddress agent, long timeout) {
0836:                try {
0837:                    // get "node://host/node"
0838:                    AddressEntry nodeAE = wps.get(agent.getAddress(),
0839:                            "topology", timeout);
0840:                    if (nodeAE == null) {
0841:                        if (log.isInfoEnabled()) {
0842:                            log.info("Missing \"topology\" WP entry for "
0843:                                    + agent);
0844:                        }
0845:                        return null;
0846:                    }
0847:                    URI nodeURI = nodeAE.getURI();
0848:                    String host = nodeURI.getHost();
0849:                    String node = nodeURI.getPath().substring(1);
0850:                    // get "version:///incarnation/moveId"
0851:                    AddressEntry versionAE = wps.get(agent.getAddress(),
0852:                            "version", timeout);
0853:                    if (versionAE == null) {
0854:                        if (log.isInfoEnabled()) {
0855:                            log.info("Missing \"version\" WP entry for "
0856:                                    + agent);
0857:                        }
0858:                        return null;
0859:                    }
0860:                    URI versionURI = versionAE.getURI();
0861:                    String tmp = versionURI.getPath();
0862:                    int sep = tmp.indexOf('/', 1);
0863:                    long inc = Long.parseLong(tmp.substring(1, sep));
0864:                    long moveId = Long.parseLong(tmp.substring(sep + 1));
0865:                    return new WPInfo(agent.getAddress(), node, host, inc,
0866:                            moveId);
0867:                } catch (Exception e) {
0868:                    if (log.isInfoEnabled()) {
0869:                        log.info("Failed WP lookup(" + agent + ")", e);
0870:                    }
0871:                    return null;
0872:                }
0873:            }
0874:
0875:            // find MoveAgent with matching ticket-id
0876:            private MoveAgent findMove(Object id) {
0877:                Collection c = moveSub.getCollection();
0878:                int n = c.size();
0879:                if (n > 0) {
0880:                    Iterator iter = c.iterator();
0881:                    for (int i = 0; i < n; i++) {
0882:                        Object o = iter.next();
0883:                        if (o instanceof  MoveAgent) {
0884:                            MoveAgent ma = (MoveAgent) o;
0885:                            MessageAddress a = ma.getSource();
0886:                            if (agentId.equals(a)) {
0887:                                Ticket ticket = ma.getTicket();
0888:                                Object tid = ticket.getIdentifier();
0889:                                if (id.equals(tid)) {
0890:                                    return ma;
0891:                                }
0892:                            }
0893:                        }
0894:                    }
0895:                }
0896:                return null;
0897:            }
0898:
0899:            private static UnaryPredicate createStepPredicate(
0900:                    final MessageAddress agentId) {
0901:                return new UnaryPredicate() {
0902:                    public boolean execute(Object o) {
0903:                        if (o instanceof  Step) {
0904:                            Step step = (Step) o;
0905:                            MessageAddress target = step.getOptions()
0906:                                    .getTarget();
0907:                            return agentId.equals(target);
0908:                        }
0909:                        return false;
0910:                    }
0911:                };
0912:            }
0913:
0914:            private static UnaryPredicate createMovePredicate(
0915:                    final MessageAddress agentId) {
0916:                return new UnaryPredicate() {
0917:                    public boolean execute(Object o) {
0918:                        if (o instanceof  MoveAgent) {
0919:                            MoveAgent ma = (MoveAgent) o;
0920:                            MessageAddress a = ma.getSource();
0921:                            return agentId.equals(a);
0922:                        }
0923:                        return false;
0924:                    }
0925:                };
0926:            }
0927:
0928:            private static UniqueObject query(CollectionSubscription sub,
0929:                    UID uid) {
0930:                Collection real = sub.getCollection();
0931:                int n = real.size();
0932:                if (n > 0) {
0933:                    for (Iterator iter = real.iterator(); iter.hasNext();) {
0934:                        Object o = iter.next();
0935:                        if (o instanceof  UniqueObject) {
0936:                            UniqueObject uo = (UniqueObject) o;
0937:                            UID x = uo.getUID();
0938:                            if (uid.equals(x)) {
0939:                                return uo;
0940:                            }
0941:                        }
0942:                    }
0943:                }
0944:                return null;
0945:            }
0946:
0947:            private static class WPInfo {
0948:                public final String agent;
0949:                public final String node;
0950:                public final String host;
0951:                public final long inc;
0952:                public final long moveId;
0953:
0954:                public WPInfo(String agent, String node, String host, long inc,
0955:                        long moveId) {
0956:                    this .agent = agent;
0957:                    this .node = node;
0958:                    this .host = host;
0959:                    this .inc = inc;
0960:                    this .moveId = moveId;
0961:                }
0962:
0963:                public String toString() {
0964:                    return "(agent=" + agent + ", node=" + node + ", host="
0965:                            + host + ", inc=" + inc + ", moveId=" + moveId
0966:                            + ")";
0967:                }
0968:            }
0969:
0970:            private class MyAlarm implements  Alarm, Comparable {
0971:                private final long expirationTime;
0972:                private boolean expired = false;
0973:
0974:                private final Object id;
0975:
0976:                public MyAlarm(long expirationTime, Object id) {
0977:                    this .expirationTime = expirationTime;
0978:                    this .id = id;
0979:                }
0980:
0981:                public Object getId() {
0982:                    return id;
0983:                }
0984:
0985:                public long getExpirationTime() {
0986:                    return expirationTime;
0987:                }
0988:
0989:                public synchronized void expire() {
0990:                    if (!expired) {
0991:                        expired = true;
0992:                        if (log.isDebugEnabled()) {
0993:                            log.debug(todd + "Alarm for ticket " + id
0994:                                    + " expired");
0995:                        }
0996:                        if (blackboard != null) {
0997:                            blackboard.signalClientActivity();
0998:                        } else {
0999:                            // bug 989?
1000:                        }
1001:                    }
1002:                }
1003:
1004:                public boolean hasExpired() {
1005:                    return expired;
1006:                }
1007:
1008:                public synchronized boolean cancel() {
1009:                    boolean was = expired;
1010:                    expired = true;
1011:                    return was;
1012:                }
1013:
1014:                public boolean equals(Object o) {
1015:                    if (o instanceof  MyAlarm) {
1016:                        long ot = ((MyAlarm) o).expirationTime;
1017:                        return (expirationTime == ot);
1018:                    }
1019:                    return false;
1020:                }
1021:
1022:                public int compareTo(Object o) {
1023:                    long ot = ((MyAlarm) o).expirationTime;
1024:                    return (expirationTime < ot) ? -1
1025:                            : (expirationTime > ot) ? 1 : 0;
1026:                }
1027:
1028:                public String toString() {
1029:                    return "Alarm {" + "\n  expirationTime: " + expirationTime
1030:                            + "\n  expired: " + expired + "\n  id: " + id
1031:                            + "\n}";
1032:                }
1033:            }
1034:
1035:            private static class Entry {
1036:                public final Ticket ticket;
1037:                public final WPInfo wpi;
1038:                public final Step step;
1039:                public MyAlarm alarm;
1040:                public MoveAgent moveAgent;
1041:
1042:                public Entry(Ticket ticket, WPInfo wpi, Step step) {
1043:                    this .ticket = ticket;
1044:                    this .wpi = wpi;
1045:                    this .step = step;
1046:                    if ((ticket == null) || (step == null)) {
1047:                        throw new IllegalArgumentException("null ticket/step");
1048:                    }
1049:                }
1050:
1051:                public String toString() {
1052:                    return "Entry {" + "\n ticket:    " + ticket
1053:                            + "\n wp info:   " + wpi + "\n step:      " + step
1054:                            + "\n alarm:     " + alarm + "\n moveAgent: "
1055:                            + moveAgent + "\n}";
1056:                }
1057:            }
1058:
1059:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.