Source Code Cross Referenced for RootMobilityPlugin.java in  » Science » Cougaar12_4 » org » cougaar » core » mobility » service » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.mobility.service 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 2001-2004 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:
0027:        package org.cougaar.core.mobility.service;
0028:
0029:        import java.net.URI;
0030:        import java.util.ArrayList;
0031:        import java.util.HashMap;
0032:        import java.util.Iterator;
0033:        import java.util.List;
0034:        import java.util.Map;
0035:        import org.cougaar.core.agent.AgentContainer;
0036:        import org.cougaar.core.component.ComponentDescription;
0037:        import org.cougaar.core.component.StateTuple;
0038:        import org.cougaar.core.mobility.AbstractTicket;
0039:        import org.cougaar.core.mobility.AddTicket;
0040:        import org.cougaar.core.mobility.MobilityClient;
0041:        import org.cougaar.core.mobility.MobilityException;
0042:        import org.cougaar.core.mobility.MoveTicket;
0043:        import org.cougaar.core.mobility.RemoveTicket;
0044:        import org.cougaar.core.mobility.arch.AbstractHandler;
0045:        import org.cougaar.core.mobility.arch.AckHandler;
0046:        import org.cougaar.core.mobility.arch.ArrivalHandler;
0047:        import org.cougaar.core.mobility.arch.DispatchRemoteHandler;
0048:        import org.cougaar.core.mobility.arch.DispatchTestHandler;
0049:        import org.cougaar.core.mobility.arch.MobilitySupport;
0050:        import org.cougaar.core.mobility.arch.NackHandler;
0051:        import org.cougaar.core.mobility.ldm.AgentControl;
0052:        import org.cougaar.core.mts.MessageAddress;
0053:        import org.cougaar.core.service.BlackboardService;
0054:        import org.cougaar.core.service.LoggingService;
0055:        import org.cougaar.core.service.wp.AddressEntry;
0056:        import org.cougaar.core.service.wp.WhitePagesService;
0057:        import org.cougaar.core.util.UID;
0058:        import org.cougaar.core.util.UniqueObject;
0059:        import org.cougaar.util.GenericStateModel;
0060:
0061:        /**
0062:         * This component coordinates agent mobility and handles agent
0063:         * add/remove requests.
0064:         */
0065:        public class RootMobilityPlugin extends AbstractMobilityPlugin {
0066:
0067:            // a map from agent MessageAddress to an AgentEntry
0068:            //
0069:            // this is used to guarantee only one control at a time,
0070:            // and hold onto an agent while it's awaiting the
0071:            // control response.
0072:            private final Map entries = new HashMap(13);
0073:
0074:            //
0075:            // handle control add/change/remove.
0076:            //
0077:            // FIXME refactor into a "switch" with pluggable handlers for each
0078:            // ticket class.
0079:            //
0080:
0081:            /** a new request for the control of a local agent. */
0082:            protected void addedAgentControl(AgentControl control) {
0083:                if (!(isNode))
0084:                    return;
0085:
0086:                AbstractTicket abstractTicket = control.getAbstractTicket();
0087:
0088:                if (log.isDebugEnabled()) {
0089:                    log.debug("Observed add of " + control.getUID());
0090:                }
0091:
0092:                if (abstractTicket instanceof  AddTicket) {
0093:                    add_add(control, (AddTicket) abstractTicket);
0094:                } else if (abstractTicket instanceof  RemoveTicket) {
0095:                    add_remove(control, (RemoveTicket) abstractTicket);
0096:                } else if (abstractTicket instanceof  MoveTicket) {
0097:                    add_move(control, (MoveTicket) abstractTicket);
0098:                } else if (abstractTicket instanceof  TransferTicket) {
0099:                    add_transfer(control, (TransferTicket) abstractTicket);
0100:                } else {
0101:                    // ignore
0102:                }
0103:            }
0104:
0105:            /** a control was changed. */
0106:            protected void changedAgentControl(AgentControl control) {
0107:                if (!(isNode))
0108:                    return;
0109:
0110:                AbstractTicket abstractTicket = control.getAbstractTicket();
0111:
0112:                if (log.isDebugEnabled()) {
0113:                    log.debug("Observed change of " + control.getUID());
0114:                }
0115:
0116:                if (abstractTicket instanceof  AddTicket) {
0117:                    change_add(control, (AddTicket) abstractTicket);
0118:                } else if (abstractTicket instanceof  RemoveTicket) {
0119:                    change_remove(control, (RemoveTicket) abstractTicket);
0120:                } else if (abstractTicket instanceof  MoveTicket) {
0121:                    change_move(control, (MoveTicket) abstractTicket);
0122:                } else if (abstractTicket instanceof  TransferTicket) {
0123:                    change_transfer(control, (TransferTicket) abstractTicket);
0124:                } else {
0125:                    // ignore
0126:                }
0127:            }
0128:
0129:            /** a control was removed. */
0130:            protected void removedAgentControl(AgentControl control) {
0131:                if (!(isNode))
0132:                    return;
0133:
0134:                AbstractTicket abstractTicket = control.getAbstractTicket();
0135:
0136:                if (log.isDebugEnabled()) {
0137:                    log.debug("Observed removal of " + control.getUID());
0138:                }
0139:
0140:                if (abstractTicket instanceof  AddTicket) {
0141:                    remove_add(control, (AddTicket) abstractTicket);
0142:                } else if (abstractTicket instanceof  RemoveTicket) {
0143:                    remove_remove(control, (RemoveTicket) abstractTicket);
0144:                } else if (abstractTicket instanceof  MoveTicket) {
0145:                    remove_move(control, (MoveTicket) abstractTicket);
0146:                } else if (abstractTicket instanceof  TransferTicket) {
0147:                    remove_transfer(control, (TransferTicket) abstractTicket);
0148:                } else {
0149:                    // ignore
0150:                }
0151:            }
0152:
0153:            /** an agent registers as a mobile agent in the local node. */
0154:            protected void registerAgent(MessageAddress id,
0155:                    ComponentDescription desc, MobilityClient agent) {
0156:                // add entry to the table
0157:                synchronized (entries) {
0158:                    AgentEntry ae = (AgentEntry) entries.get(id);
0159:                    boolean isNew;
0160:                    if (ae == null) {
0161:                        // new agent
0162:                        isNew = true;
0163:                        ae = new AgentEntry(id);
0164:                        entries.put(id, ae);
0165:                    } else if (ae.isRegistered) {
0166:                        // already registered?
0167:                        throw new RuntimeException("Agent " + id
0168:                                + " is already registered on node " + nodeId
0169:                                + ": " + ae);
0170:                    } else {
0171:                        // mobile agent arrival
0172:                        isNew = false;
0173:                    }
0174:                    if (log.isDebugEnabled()) {
0175:                        log.debug("Registered agent " + id + " on node "
0176:                                + nodeId + ", which "
0177:                                + (isNew ? "is a new" : "already had an")
0178:                                + " entry (oid: " + System.identityHashCode(ae)
0179:                                + "): " + ae + ", the description "
0180:                                + objectCompare(ae.desc, desc) + " and agent "
0181:                                + objectCompare(ae.agent, agent));
0182:                    }
0183:                    if (ae.state != null) {
0184:                        Object tmp = ae.state;
0185:                        if (tmp instanceof  LocalMoveState) {
0186:                            tmp = ((LocalMoveState) tmp).state;
0187:                        }
0188:                        ae.state = null;
0189:                        if (log.isDebugEnabled()) {
0190:                            log.debug("Setting state for agent " + id);
0191:                        }
0192:                        agent.setState(tmp);
0193:                    }
0194:                    ae.desc = desc;
0195:                    ae.agent = agent;
0196:                    ae.isRegistered = true;
0197:                }
0198:            }
0199:
0200:            /** an agent unregisters itself from the local mobility registry. */
0201:            protected void unregisterAgent(MessageAddress id) {
0202:                synchronized (entries) {
0203:                    AgentEntry ae = (AgentEntry) entries.get(id);
0204:                    if (ae == null || !ae.isRegistered) {
0205:                        // already removed?
0206:                        if (log.isErrorEnabled()) {
0207:                            log.error("Attempted to unregister agent " + id
0208:                                    + " on node " + nodeId
0209:                                    + ", but the agent is not "
0210:                                    + (ae == null ? "listed" : "registered"));
0211:                        }
0212:                        return;
0213:                    }
0214:                    ae.isRegistered = false;
0215:                    boolean removed;
0216:                    if (ae.pendingAction == AgentEntry.NONE) {
0217:                        // no longer needed
0218:                        entries.remove(id);
0219:                        removed = true;
0220:                    } else {
0221:                        // agent is unloading as part of move,
0222:                        // keep the entry in case the move fails
0223:                        removed = false;
0224:                    }
0225:                    if (log.isDebugEnabled()) {
0226:                        log.debug("Unregistered agent " + id + " on node "
0227:                                + nodeId + ", "
0228:                                + (removed ? "removed" : "keeping the")
0229:                                + " entry (oid: " + System.identityHashCode(ae)
0230:                                + "): " + ae);
0231:                    }
0232:                }
0233:            }
0234:
0235:            private static String objectCompare(Object a, Object b) {
0236:                String astr = (a == null ? "null" : ("(oid: "
0237:                        + System.identityHashCode(a) + " " + a + ")"));
0238:                if (a == b) {
0239:                    return "is identical " + astr;
0240:                }
0241:                String bstr = (b == null ? "null" : ("(oid: "
0242:                        + System.identityHashCode(b) + " " + b + ")"));
0243:                return ((a != null && a.equals(b)) ? "is equivalent"
0244:                        : "has changed")
0245:                        + " from prior " + astr + " to new " + bstr;
0246:            }
0247:
0248:            //
0249:            //
0250:            //
0251:
0252:            private void add_add(AgentControl control, AddTicket addTicket) {
0253:                if (log.isDetailEnabled()) {
0254:                    log.detail("add_add(" + control + ", " + addTicket + ")");
0255:                }
0256:
0257:                MessageAddress id = addTicket.getMobileAgent();
0258:                MessageAddress destNode = addTicket.getDestinationNode();
0259:                ComponentDescription desc = addTicket.getComponentDescription();
0260:
0261:                // check if this node is the destination node
0262:                if ((destNode != null) && (!destNode.equals(nodeId))) {
0263:                    // not for me!  let the RedirectMovePlugin forward the request
0264:                    // to the other node.
0265:                    return;
0266:                }
0267:
0268:                // FIXME consider locking in registry, to prevent multiple
0269:                //   simultaneous add/removes
0270:                Object state = addTicket.getState();
0271:                if (state != null) {
0272:                    throw new UnsupportedOperationException(
0273:                            "AddTicket with state is not implemented yet");
0274:                }
0275:
0276:                // run outside this transaction, to 
0277:                //   a) prevent blocking, and
0278:                //   b) avoid nested transactions (bug 1750)
0279:                AddAgentRunner aar = new AddAgentRunner(id, control, desc);
0280:                queue(id, aar, aar.pendingTuples);
0281:            }
0282:
0283:            private void change_add(AgentControl control, AddTicket addTicket) {
0284:                if (log.isDetailEnabled()) {
0285:                    log
0286:                            .detail("change_add(" + control + ", " + addTicket
0287:                                    + ")");
0288:                }
0289:            }
0290:
0291:            private void remove_add(AgentControl control, AddTicket addTicket) {
0292:                if (log.isDetailEnabled()) {
0293:                    log
0294:                            .detail("remove_add(" + control + ", " + addTicket
0295:                                    + ")");
0296:                }
0297:            }
0298:
0299:            private void add_remove(AgentControl control,
0300:                    RemoveTicket removeTicket) {
0301:                if (log.isDetailEnabled()) {
0302:                    log.detail("add_remove(" + control + ", " + removeTicket
0303:                            + ")");
0304:                }
0305:
0306:                // handle remove
0307:                MessageAddress id = removeTicket.getMobileAgent();
0308:                MessageAddress destNode = removeTicket.getDestinationNode();
0309:
0310:                // check if this node is the destination node
0311:                if ((destNode != null) && (!destNode.equals(nodeId))) {
0312:                    // not for me!  let the RedirectMovePlugin forward the request
0313:                    // to the other node.
0314:                    return;
0315:                }
0316:
0317:                // FIXME consider locking in registry, to prevent multiple
0318:                //   simultaneous add/removes
0319:
0320:                // run outside this transaction, to 
0321:                //   a) prevent blocking, and
0322:                //   b) avoid nested transactions (bug 1750)
0323:                RemoveAgentRunner rar = new RemoveAgentRunner(id, control);
0324:                queue(id, rar, rar.pendingTuples);
0325:            }
0326:
0327:            private void change_remove(AgentControl control,
0328:                    RemoveTicket removeTicket) {
0329:                if (log.isDetailEnabled()) {
0330:                    log.detail("change_remove(" + control + ", " + removeTicket
0331:                            + ")");
0332:                }
0333:            }
0334:
0335:            private void remove_remove(AgentControl control,
0336:                    RemoveTicket removeTicket) {
0337:                if (log.isDetailEnabled()) {
0338:                    log.detail("remove_remove(" + control + ", " + removeTicket
0339:                            + ")");
0340:                }
0341:            }
0342:
0343:            private void add_move(AgentControl control, MoveTicket moveTicket) {
0344:                if (log.isDetailEnabled()) {
0345:                    log.detail("add_move(" + control + ", " + moveTicket + ")");
0346:                }
0347:
0348:                MessageAddress id = moveTicket.getMobileAgent();
0349:                MessageAddress origNode = moveTicket.getOriginNode();
0350:                MessageAddress destNode = moveTicket.getDestinationNode();
0351:
0352:                if ((id == null) || (id.equals(nodeId))) {
0353:                    String s = "Move request " + control.getUID()
0354:                            + " attempted to move node " + nodeId
0355:                            + " -- nodes are not movable!";
0356:                    if (log.isErrorEnabled()) {
0357:                        log.error(s);
0358:                    }
0359:                    Throwable stack = new RuntimeException(s);
0360:                    control.setStatus(AgentControl.FAILURE, stack);
0361:                    blackboard.publishChange(control);
0362:                    return;
0363:                }
0364:
0365:                if ((origNode != null) && (!(nodeId.equals(origNode)))) {
0366:                    // FIXME note that this assumes that the agent is
0367:                    // on this node, and doesn't do a redirect.
0368:                    String s = "Agent " + id + " is currently on node "
0369:                            + nodeId
0370:                            + ", not on the ticket's asserted origin node "
0371:                            + origNode + " (uid: " + control.getUID() + ")";
0372:                    if (log.isErrorEnabled()) {
0373:                        log.error(s);
0374:                    }
0375:                    Throwable stack = new RuntimeException(s);
0376:                    control.setStatus(AgentControl.FAILURE, stack);
0377:                    blackboard.publishChange(control);
0378:                    return;
0379:                }
0380:
0381:                boolean isLocalMove = ((destNode == null) || (nodeId
0382:                        .equals(destNode)));
0383:
0384:                // check to see if we're already at destination node
0385:                boolean isTrivialMove = (isLocalMove && !moveTicket
0386:                        .isForceRestart());
0387:
0388:                if (!isTrivialMove) {
0389:                    // check remote destination node
0390:                    //
0391:                    // For now we do a quick check to see if the node 
0392:                    // is registered in the WP.
0393:                    //
0394:                    // See bug 1218 for details.
0395:                    String s = null;
0396:                    AddressEntry ae = null;
0397:                    try {
0398:                        ae = whitePagesService.get(destNode.getAddress(),
0399:                                "topology", (30000)); // 30 seconds
0400:                    } catch (Exception e) {
0401:                        s = e.toString();
0402:                    }
0403:                    if (ae == null) {
0404:                        if (s == null) {
0405:                            s = "It's not listed in the white pages";
0406:                        }
0407:                    } else {
0408:                        URI uri = ae.getURI();
0409:                        String path = uri.getPath();
0410:                        String node = (path == null ? null : path.substring(1));
0411:                        if (!destNode.getAddress().equals(node)) {
0412:                            s = "It's not a node agent " + ae;
0413:                        }
0414:                    }
0415:                    if (s != null) {
0416:                        // destination node is invalid!
0417:                        s = "Invalid destination node " + destNode
0418:                                + " for move of agent " + agentId + ": " + s
0419:                                + ", request uid is " + control.getOwnerUID();
0420:                        if (log.isErrorEnabled()) {
0421:                            log.error(s);
0422:                        }
0423:                        Throwable stack = new RuntimeException(s);
0424:                        control.setStatus(AgentControl.FAILURE, stack);
0425:                        blackboard.publishChange(control);
0426:                        return;
0427:                    }
0428:                }
0429:
0430:                // lookup agent in registry, lock in the move
0431:                String errorMsg = null;
0432:                ComponentDescription desc = null;
0433:                MobilityClient agent = null;
0434:                LocalMoveState localMoveState = null;
0435:                synchronized (entries) {
0436:                    // lookup the agent
0437:                    AgentEntry ae = (AgentEntry) entries.get(id);
0438:                    if (ae == null) {
0439:                        // agent is not known on this node
0440:                        errorMsg = "Agent " + id + " is not on node " + nodeId;
0441:                    } else if (ae.pendingAction != AgentEntry.NONE) {
0442:                        // already moving or arriving!
0443:                        errorMsg = "Agent " + id + " on node " + nodeId
0444:                                + " is busy with another move request: " + ae;
0445:                    } else if (!(ae.isRegistered)) {
0446:                        // agent is not registered on this node
0447:                        errorMsg = "Agent " + id + " on node " + nodeId
0448:                                + " is not registered for mobility";
0449:                    } else {
0450:                        // get the desc and agent from registration
0451:                        desc = ae.desc;
0452:                        agent = ae.agent;
0453:                        // mark as moving
0454:                        if (!isTrivialMove) {
0455:                            ae.pendingAction = AgentEntry.MOVE_DEPART;
0456:                            ae.control = control;
0457:                            if (isLocalMove) {
0458:                                localMoveState = new LocalMoveState();
0459:                                ae.state = localMoveState;
0460:                            }
0461:                        }
0462:                    }
0463:                }
0464:
0465:                if (errorMsg != null) {
0466:                    if (log.isErrorEnabled()) {
0467:                        log.error(errorMsg);
0468:                    }
0469:                    Throwable stack = new RuntimeException(errorMsg);
0470:                    control.setStatus(AgentControl.FAILURE, stack);
0471:                    blackboard.publishChange(control);
0472:                    return;
0473:                }
0474:
0475:                if (isTrivialMove) {
0476:                    // trivial success -- the agent is already
0477:                    // at the destination node
0478:                    if (log.isInfoEnabled()) {
0479:                        log.info("Agent " + id + " is already at node "
0480:                                + nodeId + ", responding with trivial success");
0481:                    }
0482:                    control.setStatus(AgentControl.MOVED, null);
0483:                    blackboard.publishChange(control);
0484:                    return;
0485:                }
0486:
0487:                // entries contains this move
0488:
0489:                // assume that the agent itself provides the state
0490:                MobilityClient stateProvider = agent;
0491:                // assume that the agent itself regulates its model
0492:                GenericStateModel model = agent;
0493:
0494:                MobilitySupportImpl support = new MobilitySupportImpl(agent,
0495:                        control, null, id, moveTicket);
0496:
0497:                AbstractHandler h;
0498:                if (isLocalMove) {
0499:                    h = new DispatchTestHandler(support, model, desc,
0500:                            stateProvider, localMoveState);
0501:                } else {
0502:                    h = new DispatchRemoteHandler(support, model, desc,
0503:                            stateProvider);
0504:                }
0505:
0506:                queue(id, h, support);
0507:            }
0508:
0509:            private void change_move(AgentControl control, MoveTicket moveTicket) {
0510:                if (log.isDetailEnabled()) {
0511:                    log.detail("change_move(" + control + ", " + moveTicket
0512:                            + ")");
0513:                }
0514:            }
0515:
0516:            private void remove_move(AgentControl control, MoveTicket moveTicket) {
0517:                if (log.isDetailEnabled()) {
0518:                    log.detail("remove_move(" + control + ", " + moveTicket
0519:                            + ")");
0520:                }
0521:            }
0522:
0523:            private void add_transfer(AgentControl control,
0524:                    TransferTicket transferTicket) {
0525:                if (log.isDetailEnabled()) {
0526:                    log.detail("add_transfer(" + control + ", "
0527:                            + transferTicket + ")");
0528:                }
0529:
0530:                MoveTicket moveTicket = transferTicket.getMoveTicket();
0531:
0532:                MessageAddress destNode = moveTicket.getDestinationNode();
0533:                if (destNode == null) {
0534:                    // not expected, since only remote controls
0535:                    // create transfers
0536:                    if (log.isErrorEnabled()) {
0537:                        log.error("Unexpected agent-transfer "
0538:                                + control.getUID() + " added on node " + nodeId
0539:                                + " with null destination node, ticket: "
0540:                                + moveTicket);
0541:                    }
0542:                    return;
0543:                }
0544:
0545:                if (!(nodeId.equals(destNode))) {
0546:                    if (!nodeId.equals(control.getSource())) {
0547:                        // created by this plugin
0548:                        if (log.isErrorEnabled()) {
0549:                            log.error("Invalid agent transfer with source "
0550:                                    + control.getSource() + " to node "
0551:                                    + destNode + " doesn't match this node "
0552:                                    + nodeId);
0553:                        }
0554:                    }
0555:                    return;
0556:                }
0557:
0558:                MessageAddress id = moveTicket.getMobileAgent();
0559:
0560:                // get the desc and mobile state
0561:                ComponentDescription desc = transferTicket
0562:                        .getComponentDescription();
0563:                Object state = transferTicket.getState();
0564:
0565:                // force GC of the agent state once transfer-ADD completes
0566:                transferTicket.clearState();
0567:
0568:                // make sure agent is not registered, lock in arrival
0569:                String errorMsg = null;
0570:                synchronized (entries) {
0571:                    AgentEntry ae = (AgentEntry) entries.get(id);
0572:                    boolean isNew = false;
0573:                    if (ae == null) {
0574:                        isNew = true;
0575:                        ae = new AgentEntry(id);
0576:                        entries.put(id, ae);
0577:                    }
0578:                    if (ae.pendingAction != AgentEntry.NONE) {
0579:                        // agent is leaving this node?
0580:                        errorMsg = "Unable to accept remote agent " + id
0581:                                + ", a move is already in progress: " + ae;
0582:                    } else if (ae.isRegistered) {
0583:                        // already moving or adding the agent?
0584:                        errorMsg = "Unable to accept remote agent " + id
0585:                                + ", that agent is already on node " + nodeId
0586:                                + ": " + ae;
0587:                    } else {
0588:                        ae.pendingAction = AgentEntry.MOVE_ARRIVAL;
0589:                        ae.control = control;
0590:                        ae.state = state;
0591:                        if (log.isDebugEnabled()) {
0592:                            if (isNew) {
0593:                                log.debug("Created new entry for agent " + id
0594:                                        + " move arrival" + " (oid: "
0595:                                        + System.identityHashCode(ae) + "): "
0596:                                        + ae);
0597:                            } else {
0598:                                log.debug("Updated entry for agent " + id
0599:                                        + " move arrival" + " (oid: "
0600:                                        + System.identityHashCode(ae) + "): "
0601:                                        + ae + ", description "
0602:                                        + objectCompare(ae.desc, ae.desc)
0603:                                        + ", agent "
0604:                                        + objectCompare(ae.agent, ae.agent));
0605:                            }
0606:                        }
0607:                    }
0608:                }
0609:
0610:                if (errorMsg != null) {
0611:                    if (log.isErrorEnabled()) {
0612:                        log.error(errorMsg);
0613:                    }
0614:                    Throwable stack = new RuntimeException(errorMsg);
0615:                    control.setStatus(AgentControl.FAILURE, stack);
0616:                    blackboard.publishChange(control);
0617:                    return;
0618:                }
0619:
0620:                MobilitySupportImpl support = new MobilitySupportImpl(null,
0621:                        null, control, id, moveTicket);
0622:
0623:                AbstractHandler h = new ArrivalHandler(support, desc);
0624:
0625:                queue(id, h, support);
0626:            }
0627:
0628:            private void change_transfer(AgentControl control,
0629:                    TransferTicket transferTicket) {
0630:                if (log.isDetailEnabled()) {
0631:                    log.detail("change_transfer(" + control + ", "
0632:                            + transferTicket + ")");
0633:                }
0634:
0635:                MoveTicket moveTicket = transferTicket.getMoveTicket();
0636:
0637:                MessageAddress id = moveTicket.getMobileAgent();
0638:
0639:                MessageAddress origNode = moveTicket.getOriginNode();
0640:                if ((origNode != null) && (!(nodeId.equals(origNode)))) {
0641:                    if (origNode.equals(control.getSource())) {
0642:                        // ignore, changed by this plugin
0643:                        return;
0644:                    } else {
0645:                        if (log.isErrorEnabled()) {
0646:                            log.error("Invalid change in transfer "
0647:                                    + control.getUID()
0648:                                    + ", intended for origin node " + origNode
0649:                                    + ", not local node " + nodeId);
0650:                        }
0651:                        return;
0652:                    }
0653:                }
0654:
0655:                int status = control.getStatusCode();
0656:                if (status == AgentEntry.NONE) {
0657:                    if (log.isDebugEnabled()) {
0658:                        log.debug("Ignore change with no status for transfer "
0659:                                + control.getUID());
0660:                    }
0661:                    return;
0662:                }
0663:
0664:                boolean isNack = (status != AgentControl.MOVED);
0665:                Throwable stack = (isNack ? control.getFailureStackTrace()
0666:                        : null);
0667:
0668:                // force GC of the captured agent state
0669:                //
0670:                // this is important, otherwise the state will be
0671:                // transfered again when we publish-remove the
0672:                // transfer-control object.
0673:                transferTicket.clearState();
0674:
0675:                // remove the completed transfer
0676:                //
0677:                // this may complicate debugging, but it helps ensure
0678:                // GC of these transfer-controls even if the original
0679:                // move-control is never removed.  The other option is
0680:                // to wait for the removal of the move-control.
0681:                blackboard.publishRemove(control);
0682:
0683:                // make sure agent is not registered, lock in arrival
0684:                String errorMsg = null;
0685:                MobilityClient agent = null;
0686:                synchronized (entries) {
0687:                    AgentEntry ae = (AgentEntry) entries.get(id);
0688:                    if (ae == null) {
0689:                        // no such control request
0690:                        errorMsg = "Unknown agent " + id + " on node " + nodeId
0691:                                + ", so unable to process move "
0692:                                + (isNack ? "failure" : "success")
0693:                                + " response";
0694:                    } else if (ae.pendingAction != AgentEntry.MOVE_DEPART) {
0695:                        // agent is not moving, so we're not expecting an [n]ack
0696:                        errorMsg = "Agent " + id + " is not moving on node "
0697:                                + nodeId + ", so unable to process move "
0698:                                + (isNack ? "failure" : "success")
0699:                                + " response";
0700:                    } else if (!(ae.isRegistered)) {
0701:                        // expecting the agent to stay registered during control
0702:                        // since unregister is in agent's "stop()".
0703:                        errorMsg = "Agent " + id + " on node " + nodeId
0704:                                + " is no longer registered";
0705:                    } else {
0706:                        agent = ae.agent;
0707:                        ae.pendingAction = AgentEntry.MOVE_CONFIRM;
0708:                        ae.control = control;
0709:                    }
0710:                }
0711:
0712:                if (errorMsg != null) {
0713:                    if (log.isErrorEnabled()) {
0714:                        log.error(errorMsg, stack);
0715:                    }
0716:                    return;
0717:                }
0718:
0719:                // find the original "move" control
0720:                UID moveControlUID = control.getOwnerUID();
0721:                AgentControl moveControl = findAgentControl(moveControlUID);
0722:                if (moveControl == null) {
0723:                    if (log.isWarnEnabled()) {
0724:                        log.warn("Agent " + id + " control request "
0725:                                + moveControlUID + " for transfer "
0726:                                + control.getUID() + " not found in node "
0727:                                + nodeId + "'s blackboard, "
0728:                                + " will be unable to set the control status, "
0729:                                + " but will complete the control anyways");
0730:                    }
0731:                }
0732:
0733:                MobilitySupportImpl support = new MobilitySupportImpl(agent,
0734:                        moveControl, control, id, moveTicket);
0735:
0736:                AbstractHandler h;
0737:                if (isNack) {
0738:                    h = new NackHandler(support, agent, stack);
0739:                } else {
0740:                    h = new AckHandler(support, agent);
0741:                }
0742:
0743:                queue(id, h, support);
0744:            }
0745:
0746:            private void remove_transfer(AgentControl control,
0747:                    TransferTicket transferTicket) {
0748:                if (log.isDetailEnabled()) {
0749:                    log.detail("remove_transfer(" + control + ", "
0750:                            + transferTicket + ")");
0751:                }
0752:            }
0753:
0754:            //
0755:            //
0756:            //
0757:
0758:            private void queue(MessageAddress id, AbstractHandler h,
0759:                    MobilitySupportImpl support) {
0760:                queue(id, h, support.pendingTuples);
0761:            }
0762:
0763:            private void queue(final MessageAddress id, final Runnable r,
0764:                    final List pendingTuples) {
0765:                // ensure queue cleanup
0766:                Runnable r2 = new Runnable() {
0767:                    public void run() {
0768:                        try {
0769:                            r.run();
0770:                        } finally {
0771:                            dequeue(id, r, pendingTuples);
0772:                        }
0773:                    }
0774:
0775:                    public String toString() {
0776:                        return r.toString();
0777:                    }
0778:                };
0779:                queue(r2);
0780:            }
0781:
0782:            private void dequeue(MessageAddress id, Runnable r2,
0783:                    final List pendingTuples) {
0784:                if (r2 instanceof  DispatchRemoteHandler) {
0785:                    // leave the entry, we're waiting for the response.
0786:                    if (log.isDebugEnabled()) {
0787:                        synchronized (entries) {
0788:                            AgentEntry ae = (AgentEntry) entries.get(id);
0789:                            log.debug("Completed move action <dispatch> " + r2
0790:                                    + " for agent " + id + " on node " + nodeId
0791:                                    + ", keeping the move entry" + " (oid: "
0792:                                    + System.identityHashCode(ae) + "): " + ae);
0793:                        }
0794:                    }
0795:                } else {
0796:                    // remove moving flag
0797:                    synchronized (entries) {
0798:                        AgentEntry ae = (AgentEntry) entries.get(id);
0799:                        if (ae == null) {
0800:                            // aborted handler?
0801:                            if (log.isDebugEnabled()) {
0802:                                log.debug("Completed move action " + r2
0803:                                        + " for agent " + id + " on node "
0804:                                        + nodeId + ", but the entry is null");
0805:                            }
0806:                        } else {
0807:                            ae.pendingAction = AgentEntry.NONE;
0808:                            if (!(ae.isRegistered)) {
0809:                                entries.remove(id);
0810:                                if (log.isDebugEnabled()) {
0811:                                    log.debug("Completed move action " + r2
0812:                                            + " for agent " + id + " on node "
0813:                                            + nodeId + ", removed entry"
0814:                                            + " (oid: "
0815:                                            + System.identityHashCode(ae)
0816:                                            + "): " + ae);
0817:                                }
0818:                            } else {
0819:                                // keep in table for future moves
0820:                                if (log.isDebugEnabled()) {
0821:                                    log.debug("Completed move action " + r2
0822:                                            + " for agent " + id + " on node "
0823:                                            + nodeId + ", keeping the entry "
0824:                                            + " (oid: "
0825:                                            + System.identityHashCode(ae)
0826:                                            + "): " + ae);
0827:                                }
0828:                            }
0829:                        }
0830:                    }
0831:                }
0832:                // queue pending blackboard operations
0833:                if (!pendingTuples.isEmpty()) {
0834:                    Runnable r3 = new Runnable() {
0835:                        public void run() {
0836:                            for (Iterator iter = pendingTuples.iterator(); iter
0837:                                    .hasNext();) {
0838:                                PendingTuple pt = (PendingTuple) iter.next();
0839:                                if (log.isDebugEnabled()) {
0840:                                    log.debug("Blackboard " + pt);
0841:                                }
0842:                                Object obj = pt.obj;
0843:                                switch (pt.op) {
0844:                                case PendingTuple.ADD:
0845:                                    blackboard.publishAdd(obj);
0846:                                    break;
0847:                                case PendingTuple.CHANGE:
0848:                                    blackboard.publishChange(obj);
0849:                                    break;
0850:                                case PendingTuple.REMOVE:
0851:                                    blackboard.publishRemove(obj);
0852:                                    break;
0853:                                }
0854:                            }
0855:                        }
0856:                    };
0857:                    fireLater(r3);
0858:                }
0859:            }
0860:
0861:            private class AgentEntry {
0862:
0863:                /**
0864:                 * pendingAction constants.
0865:                 */
0866:                public static final int NONE = 0;
0867:                // local agent add
0868:                public static final int ADD = 1;
0869:                // local agent remove
0870:                public static final int REMOVE = 2;
0871:                // sender-side agent is moving away
0872:                public static final int MOVE_DEPART = 3;
0873:                // target-side agent is being added
0874:                public static final int MOVE_ARRIVAL = 4;
0875:                // sender-side process the move response
0876:                public static final int MOVE_CONFIRM = 5;
0877:
0878:                public final MessageAddress id;
0879:                public ComponentDescription desc;
0880:                public Object state;
0881:                public MobilityClient agent;
0882:
0883:                public boolean isRegistered;
0884:
0885:                public int pendingAction = NONE;
0886:
0887:                public AgentControl control;
0888:
0889:                public AgentEntry(MessageAddress id) {
0890:                    this .id = id;
0891:                }
0892:
0893:                public String getPendingActionAsString() {
0894:                    switch (pendingAction) {
0895:                    case NONE:
0896:                        return "none";
0897:                    case ADD:
0898:                        return "add";
0899:                    case REMOVE:
0900:                        return "remove";
0901:                    case MOVE_DEPART:
0902:                        return "move_depart";
0903:                    case MOVE_ARRIVAL:
0904:                        return "move_arrival";
0905:                    case MOVE_CONFIRM:
0906:                        return "move_confirm";
0907:                    default:
0908:                        return "?";
0909:                    }
0910:                }
0911:
0912:                public String toString() {
0913:                    return "control request for agent "
0914:                            + id
0915:                            + ", state is <"
0916:                            + (isRegistered ? "" : "not ")
0917:                            + "registered + "
0918:                            + getPendingActionAsString()
0919:                            + ">"
0920:                            + ((control != null) ? (", with ticket " + control
0921:                                    .getAbstractTicket()) : "");
0922:                }
0923:            }
0924:
0925:            private static class PendingTuple {
0926:                public static final int ADD = 0;
0927:                public static final int CHANGE = 1;
0928:                public static final int REMOVE = 2;
0929:                public final Object obj;
0930:                public final int op;
0931:
0932:                public PendingTuple(int op, Object obj) {
0933:                    this .op = op;
0934:                    this .obj = obj;
0935:                }
0936:
0937:                public String toString() {
0938:                    return "queued "
0939:                            + ((op == ADD) ? "add" : (op == CHANGE) ? "change"
0940:                                    : (op == REMOVE) ? "remove" : "?")
0941:                            + " of object "
0942:                            + ((obj instanceof  UniqueObject) ? ("with uid " + (((UniqueObject) obj)
0943:                                    .getUID()))
0944:                                    : (obj != null) ? obj.toString() : "null");
0945:                }
0946:            }
0947:
0948:            private class MobilitySupportImpl extends AbstractMobilitySupport {
0949:
0950:                private final List pendingTuples = new ArrayList(3);
0951:
0952:                private MobilityClient agent;
0953:                private AgentControl moveControl;
0954:                private AgentControl transferControl;
0955:
0956:                public MobilitySupportImpl(MobilityClient agent,
0957:                        AgentControl moveControl, AgentControl transferControl,
0958:                        MessageAddress id, MoveTicket moveTicket) {
0959:                    super (id, RootMobilityPlugin.this .nodeId, moveTicket,
0960:                            RootMobilityPlugin.this .log);
0961:                    this .agent = agent;
0962:                    this .moveControl = moveControl;
0963:                    this .transferControl = transferControl;
0964:                }
0965:
0966:                public void onDispatch() {
0967:                    MessageAddress destNode = moveTicket.getDestinationNode();
0968:                    try {
0969:                        agent.onDispatch(destNode);
0970:                    } catch (MobilityException me) {
0971:                        throw me;
0972:                    } catch (Exception e) {
0973:                        if (RootMobilityPlugin.this .log.isErrorEnabled()) {
0974:                            RootMobilityPlugin.this .log.error("Failed agent "
0975:                                    + id + " move to node " + destNode, e);
0976:                        }
0977:                    }
0978:                }
0979:
0980:                public void onArrival() {
0981:                    if (moveControl != null) {
0982:                        moveControl.setStatus(AgentControl.MOVED, null);
0983:                        publishChangeLater(moveControl);
0984:                    } else {
0985:                        if (RootMobilityPlugin.this .log.isWarnEnabled()) {
0986:                            RootMobilityPlugin.this .log
0987:                                    .warn("Unable to set move status for transfer "
0988:                                            + ((transferControl != null) ? transferControl
0989:                                                    .getUID().toString()
0990:                                                    : "<unknown>"));
0991:                        }
0992:                    }
0993:                }
0994:
0995:                public void onFailure(Throwable throwable) {
0996:                    moveControl.setStatus(AgentControl.FAILURE, throwable);
0997:                    publishChangeLater(moveControl);
0998:                }
0999:
1000:                public void onRemoval() {
1001:                }
1002:
1003:                public void setPendingModel(GenericStateModel model) {
1004:                }
1005:
1006:                public GenericStateModel takePendingModel() {
1007:                    return null;
1008:                }
1009:
1010:                public void sendTransfer(ComponentDescription desc, Object state) {
1011:                    TransferTicket transferTicket = new TransferTicket(
1012:                            moveTicket, desc, state);
1013:                    AgentControl newTC = createAgentControl(moveControl
1014:                            .getUID(), moveTicket.getDestinationNode(),
1015:                            transferTicket);
1016:                    transferControl = newTC;
1017:                    publishAddLater(newTC);
1018:                }
1019:
1020:                public void sendAck() {
1021:                    transferControl.setStatus(AgentControl.MOVED, null);
1022:                    publishChangeLater(transferControl);
1023:                }
1024:
1025:                public void sendNack(Throwable throwable) {
1026:                    transferControl.setStatus(AgentControl.FAILURE, throwable);
1027:                    publishChangeLater(transferControl);
1028:                }
1029:
1030:                public void addAgent(ComponentDescription desc) {
1031:                    StateTuple tuple = new StateTuple(desc, null);
1032:                    agentContainer.addAgent(id, tuple);
1033:                }
1034:
1035:                public void removeAgent() {
1036:                    agentContainer.removeAgent(id);
1037:                }
1038:
1039:                private void publishAddLater(Object o) {
1040:                    addPendingTuple(PendingTuple.ADD, o);
1041:                }
1042:
1043:                private void publishChangeLater(Object o) {
1044:                    addPendingTuple(PendingTuple.CHANGE, o);
1045:                }
1046:
1047:                //      private void publishRemoveLater(Object o) {
1048:                //        addPendingTuple(PendingTuple.REMOVE, o);
1049:                //      }
1050:
1051:                private void addPendingTuple(int op, Object o) {
1052:                    addPendingTuple(new PendingTuple(op, o));
1053:                }
1054:
1055:                private void addPendingTuple(PendingTuple pt) {
1056:                    if (pt == null) {
1057:                        throw new IllegalArgumentException("null pt");
1058:                    }
1059:                    pendingTuples.add(pt);
1060:                }
1061:            }
1062:
1063:            private class AddAgentRunner implements  Runnable {
1064:
1065:                public final List pendingTuples = new ArrayList(1);
1066:
1067:                private final MessageAddress id;
1068:                private final AgentControl control;
1069:                private final ComponentDescription desc;
1070:
1071:                public AddAgentRunner(MessageAddress id, AgentControl control,
1072:                        ComponentDescription desc) {
1073:                    this .id = id;
1074:                    this .control = control;
1075:                    this .desc = desc;
1076:                }
1077:
1078:                public void run() {
1079:
1080:                    // add into this node
1081:                    if (log.isInfoEnabled()) {
1082:                        log.info("Add agent " + id + " to node " + nodeId);
1083:                    }
1084:
1085:                    int resultState;
1086:                    Throwable resultStack = null;
1087:                    try {
1088:
1089:                        StateTuple tuple = new StateTuple(desc, null);
1090:                        agentContainer.addAgent(id, tuple);
1091:
1092:                        // success!
1093:                        resultState = AgentControl.CREATED;
1094:
1095:                        if (log.isInfoEnabled()) {
1096:                            log
1097:                                    .info("Added agent " + id + " to node "
1098:                                            + nodeId);
1099:                        }
1100:
1101:                    } catch (Exception e) {
1102:                        // either already exists or unable to add
1103:                        //
1104:                        // HACK: check the exception message
1105:                        String msg = e.getMessage();
1106:                        if (msg != null && msg.indexOf(" already exists") > 0) {
1107:                            // already exists
1108:                            resultState = AgentControl.ALREADY_EXISTS;
1109:                            if (log.isErrorEnabled()) {
1110:                                log.error("Agent " + id
1111:                                        + " already exists on node " + nodeId);
1112:                            }
1113:                        } else {
1114:                            // couldn't add
1115:                            resultState = AgentControl.FAILURE;
1116:                            resultStack = e;
1117:                            if (log.isErrorEnabled()) {
1118:                                log.error("Unable to add agent " + id, e);
1119:                            }
1120:                        }
1121:                    }
1122:
1123:                    // set our response state
1124:                    control.setStatus(resultState, resultStack);
1125:
1126:                    // publish-change later
1127:                    PendingTuple pt = new PendingTuple(PendingTuple.CHANGE,
1128:                            control);
1129:                    pendingTuples.add(pt);
1130:                }
1131:            }
1132:
1133:            private class RemoveAgentRunner implements  Runnable {
1134:
1135:                public final List pendingTuples = new ArrayList(1);
1136:
1137:                private final MessageAddress id;
1138:                private final AgentControl control;
1139:
1140:                public RemoveAgentRunner(MessageAddress id, AgentControl control) {
1141:                    this .id = id;
1142:                    this .control = control;
1143:                }
1144:
1145:                public void run() {
1146:
1147:                    // remove agent from this node
1148:                    if (log.isInfoEnabled()) {
1149:                        log.info("Remove agent " + id + " from node " + nodeId);
1150:                    }
1151:
1152:                    int resultState;
1153:                    Throwable resultStack = null;
1154:                    try {
1155:                        agentContainer.removeAgent(id);
1156:                        // success!
1157:                        resultState = AgentControl.REMOVED;
1158:                        if (log.isInfoEnabled()) {
1159:                            log.info("Removed agent " + id + " from node "
1160:                                    + nodeId);
1161:                        }
1162:                    } catch (Exception e) {
1163:                        // either already removed or unable to remove
1164:                        //
1165:                        // HACK: check the exception message
1166:                        String msg = e.getMessage();
1167:                        if (msg != null && msg.indexOf(" is not loaded") > 0) {
1168:                            // already exists
1169:                            resultState = AgentControl.DOES_NOT_EXIST;
1170:                            if (log.isErrorEnabled()) {
1171:                                log.error("Agent " + id + " is not on node "
1172:                                        + nodeId);
1173:                            }
1174:                        } else {
1175:                            // couldn't add
1176:                            resultState = AgentControl.FAILURE;
1177:                            resultStack = e;
1178:                            if (log.isErrorEnabled()) {
1179:                                log.error("Unable to remove agent " + id, e);
1180:                            }
1181:                        }
1182:                    }
1183:
1184:                    // set our response state
1185:                    control.setStatus(resultState, resultStack);
1186:
1187:                    // publish-change later
1188:                    PendingTuple pt = new PendingTuple(PendingTuple.CHANGE,
1189:                            control);
1190:                    pendingTuples.add(pt);
1191:                }
1192:            }
1193:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.