Source Code Cross Referenced for LeaseManager.java in  » Science » Cougaar12_4 » org » cougaar » core » wp » resolver » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.wp.resolver 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 2002-2004 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:
0027:        package org.cougaar.core.wp.resolver;
0028:
0029:        import java.util.ArrayList;
0030:        import java.util.Collections;
0031:        import java.util.HashMap;
0032:        import java.util.Iterator;
0033:        import java.util.List;
0034:        import java.util.Map;
0035:        import java.util.Set;
0036:        import org.cougaar.core.component.Component;
0037:        import org.cougaar.core.component.ServiceBroker;
0038:        import org.cougaar.core.component.ServiceProvider;
0039:        import org.cougaar.core.component.ServiceRevokedListener;
0040:        import org.cougaar.core.mts.MessageAddress;
0041:        import org.cougaar.core.node.NodeControlService;
0042:        import org.cougaar.core.service.LoggingService;
0043:        import org.cougaar.core.service.ThreadService;
0044:        import org.cougaar.core.service.UIDService;
0045:        import org.cougaar.core.service.wp.AddressEntry;
0046:        import org.cougaar.core.service.wp.Request;
0047:        import org.cougaar.core.service.wp.Response;
0048:        import org.cougaar.core.thread.Schedulable;
0049:        import org.cougaar.core.util.UID;
0050:        import org.cougaar.core.wp.Parameters;
0051:        import org.cougaar.core.wp.Timestamp;
0052:        import org.cougaar.core.wp.bootstrap.Bundle;
0053:        import org.cougaar.core.wp.bootstrap.BundleService;
0054:        import org.cougaar.util.GenericStateModelAdapter;
0055:        import org.cougaar.util.RarelyModifiedList;
0056:
0057:        /**
0058:         * This component watches for bind/unbind requests and maintains
0059:         * the leases in the server.
0060:         */
0061:        public class LeaseManager extends GenericStateModelAdapter implements 
0062:                Component {
0063:
0064:            private LeaserConfig config;
0065:
0066:            // name -> ActiveLease
0067:            // Map<String, ActiveLease>
0068:            private final Map leases = new HashMap();
0069:
0070:            private ServiceBroker sb;
0071:            private ServiceBroker rootsb;
0072:
0073:            private LoggingService logger;
0074:            private ThreadService threadService;
0075:            private UIDService uidService;
0076:            private ModifyService modifyService;
0077:
0078:            private final ModifyService.Client myClient = new ModifyService.Client() {
0079:                public void modifyAnswer(long baseTime, Map m) {
0080:                    LeaseManager.this .modifyAnswer(baseTime, m);
0081:                }
0082:            };
0083:
0084:            private LeaseSP leaseSP;
0085:            private BundleSP bundleSP;
0086:
0087:            private final RarelyModifiedList listeners = new RarelyModifiedList();
0088:
0089:            //
0090:            // renew leases:
0091:            //
0092:
0093:            private Schedulable renewLeasesThread;
0094:
0095:            public void setParameter(Object o) {
0096:                configure(o);
0097:            }
0098:
0099:            public void setServiceBroker(ServiceBroker sb) {
0100:                this .sb = sb;
0101:            }
0102:
0103:            public void setLoggingService(LoggingService logger) {
0104:                this .logger = logger;
0105:            }
0106:
0107:            public void setThreadService(ThreadService threadService) {
0108:                this .threadService = threadService;
0109:            }
0110:
0111:            public void setUIDService(UIDService uidService) {
0112:                this .uidService = uidService;
0113:            }
0114:
0115:            private void configure(Object o) {
0116:                if (config != null) {
0117:                    return;
0118:                }
0119:                config = new LeaserConfig(o);
0120:            }
0121:
0122:            public void load() {
0123:                super .load();
0124:
0125:                configure(null);
0126:
0127:                // register for lookups
0128:                modifyService = (ModifyService) sb.getService(myClient,
0129:                        ModifyService.class, null);
0130:                if (modifyService == null) {
0131:                    throw new RuntimeException("Unable to obtain ModifyService");
0132:                }
0133:
0134:                Runnable renewLeasesRunner = new Runnable() {
0135:                    public void run() {
0136:                        // assert (thread == renewLeasesThread);
0137:                        renewLeases();
0138:                    }
0139:                };
0140:                renewLeasesThread = threadService.getThread(this ,
0141:                        renewLeasesRunner, "White pages server renew leases");
0142:                renewLeasesThread.schedule(config.checkLeasesPeriod);
0143:
0144:                NodeControlService ncs = (NodeControlService) sb.getService(
0145:                        this , NodeControlService.class, null);
0146:                if (ncs != null) {
0147:                    rootsb = ncs.getRootServiceBroker();
0148:                    sb.releaseService(this , NodeControlService.class, ncs);
0149:                }
0150:
0151:                // advertise our services
0152:                leaseSP = new LeaseSP();
0153:                sb.addService(LeaseService.class, leaseSP);
0154:                bundleSP = new BundleSP();
0155:                ServiceBroker bundleSB = (rootsb == null ? sb : rootsb);
0156:                bundleSB.addService(BundleService.class, bundleSP);
0157:            }
0158:
0159:            public void unload() {
0160:                // cancel existing scheduled threads
0161:                renewLeasesThread.cancel();
0162:
0163:                // release services
0164:                if (bundleSP != null) {
0165:                    ServiceBroker bundleSB = (rootsb == null ? sb : rootsb);
0166:                    bundleSB.revokeService(BundleService.class, bundleSP);
0167:                    bundleSP = null;
0168:                }
0169:                if (leaseSP != null) {
0170:                    sb.revokeService(LeaseService.class, leaseSP);
0171:                    leaseSP = null;
0172:                }
0173:                if (modifyService != null) {
0174:                    sb.releaseService(myClient, ModifyService.class,
0175:                            modifyService);
0176:                    modifyService = null;
0177:                }
0178:                if (uidService != null) {
0179:                    sb.releaseService(this , UIDService.class, uidService);
0180:                    uidService = null;
0181:                }
0182:                if (threadService != null) {
0183:                    // halt our threads?
0184:                    sb.releaseService(this , ThreadService.class, threadService);
0185:                    threadService = null;
0186:                }
0187:                if (logger != null) {
0188:                    sb.releaseService(this , LoggingService.class, logger);
0189:                    logger = null;
0190:                }
0191:                super .unload();
0192:            }
0193:
0194:            private Bundle getBundle(String name) {
0195:                synchronized (leases) {
0196:                    ActiveLease lease = (ActiveLease) leases.get(name);
0197:                    return asBundle(name, lease);
0198:                }
0199:            }
0200:
0201:            private Map getAllBundles() {
0202:                synchronized (leases) {
0203:                    Map ret = new HashMap(leases.size());
0204:                    for (Iterator iter = leases.entrySet().iterator(); iter
0205:                            .hasNext();) {
0206:                        Map.Entry me = (Map.Entry) iter.next();
0207:                        String name = (String) me.getKey();
0208:                        ActiveLease lease = (ActiveLease) me.getValue();
0209:                        Bundle bundle = asBundle(name, lease);
0210:                        if (bundle == null) {
0211:                            continue;
0212:                        }
0213:                        ret.put(name, bundle);
0214:                    }
0215:                    return ret;
0216:                }
0217:            }
0218:
0219:            private Bundle asBundle(String name, ActiveLease lease) {
0220:                if (lease == null) {
0221:                    return null;
0222:                }
0223:                Record record = lease.record;
0224:                if (record == null) {
0225:                    return null;
0226:                }
0227:                UID uid = record.getUID();
0228:                long ttd;
0229:                if (lease.sendTime > 0) {
0230:                    // still pending
0231:                    ttd = config.minBundleTTD;
0232:                } else {
0233:                    // use server ttd
0234:                    ttd = lease.expireTime - lease.boundTime;
0235:                    if (ttd < config.minBundleTTD) {
0236:                        ttd = config.minBundleTTD;
0237:                    }
0238:                }
0239:                Map entries = (Map) record.getData();
0240:                Bundle bundle = new Bundle(name, uid, ttd, entries);
0241:                return bundle;
0242:            }
0243:
0244:            private void register(BundleService.Client bsc) {
0245:                listeners.add(bsc);
0246:                bsc.addAll(getAllBundles());
0247:            }
0248:
0249:            private void unregister(BundleService.Client bsc) {
0250:                listeners.remove(bsc);
0251:            }
0252:
0253:            private void submit(Response res, String agent) {
0254:                // watch for bind/unbind
0255:                Request req = res.getRequest();
0256:                if (req instanceof  Request.Bind) {
0257:                    if (req.hasOption(Request.CACHE_ONLY)) {
0258:                        res.setResult(new Long(Long.MAX_VALUE));
0259:                    } else {
0260:                        bind(res, agent, (Request.Bind) req);
0261:                    }
0262:                } else if (req instanceof  Request.Unbind) {
0263:                    if (req.hasOption(Request.CACHE_ONLY)) {
0264:                        res.setResult(Boolean.TRUE);
0265:                    } else {
0266:                        unbind(res, agent, (Request.Unbind) req);
0267:                    }
0268:                } else {
0269:                    // ignore
0270:                }
0271:            }
0272:
0273:            private void bind(Response res, String agent, Request.Bind req) {
0274:                long activeExpire = -1;
0275:                List cancelledResponses = null;
0276:
0277:                boolean createNewLease = false;
0278:                String unifiedAgent = null;
0279:                Record record = null;
0280:                Bundle bundle = null;
0281:
0282:                AddressEntry ae = req.getAddressEntry();
0283:                String name = ae.getName();
0284:                String type = ae.getType();
0285:
0286:                if (logger.isInfoEnabled() && !req.isOverWrite()) {
0287:                    logger.info("Warning: treating bind as rebind: " + req);
0288:                }
0289:
0290:                synchronized (leases) {
0291:                    AddressEntry oldAE = null;
0292:                    List oldResponses = null;
0293:                    Map oldData = null;
0294:
0295:                    ActiveLease lease = (ActiveLease) leases.get(name);
0296:                    if (lease != null) {
0297:                        // get the bound/pending lease's entry
0298:                        oldData = (Map) lease.record.getData();
0299:                        oldAE = (oldData == null ? (null)
0300:                                : (AddressEntry) oldData.get(type));
0301:                    }
0302:
0303:                    if (lease == null) {
0304:                        // new bind-pending lease
0305:                        createNewLease = true;
0306:                    } else if (ae.equals(oldAE)) {
0307:                        // the lease is active, so we know the status
0308:                        activeExpire = lease.getExpirationTime();
0309:                        if (logger.isDetailEnabled()) {
0310:                            logger.detail("lease already active: " + lease);
0311:                        }
0312:                    } else if (!lease.isBound() && ae.equals(oldAE)) {
0313:                        // already in progress, we don't know the
0314:                        // status yet, so batch with our pending request.
0315:                        lease.addResponse(res);
0316:                        if (logger.isDetailEnabled()) {
0317:                            logger.detail("lease rebind " + ae
0318:                                    + " already in progress, batching: "
0319:                                    + lease);
0320:                        }
0321:                        // batched here
0322:                    } else {
0323:                        // cancel old lease, initiate a new one
0324:                        //
0325:                        // Note that multiple pending binds may be in progress, so
0326:                        // we must handle these outstanding requests.  There are
0327:                        // two options:
0328:                        //   a) Cancel the outstanding requests and ignore the
0329:                        //      WP's answers when they arrive
0330:                        //   b) Send both asynchronously and tell the requests
0331:                        //      their answers, even if they conflict.
0332:                        // We'll go with (a) and cancel the pending requests.
0333:                        // This is more consistent with the notion of a
0334:                        // client-side lease manager, since only one binding will
0335:                        // be maintained and conflicts are a client-side error.
0336:                        //
0337:                        // Unbind must do a similar cancel.
0338:                        //
0339:                        createNewLease = true;
0340:                        // clear the old responses for cancelling
0341:                        cancelledResponses = lease.takeResponses(type);
0342:                        // keep the rest
0343:                        oldResponses = lease.takeResponses();
0344:                    }
0345:
0346:                    if (createNewLease) {
0347:                        // create a new lease and replace the old one
0348:                        if (oldAE != null && logger.isInfoEnabled()) {
0349:                            logger.info("Binding replacement entry " + ae
0350:                                    + " for current entry " + oldAE
0351:                                    + " in lease " + lease);
0352:                        }
0353:
0354:                        Map data;
0355:                        int oldSize = (oldData == null ? 0 : oldData.size());
0356:                        if (oldSize == 0
0357:                                || (oldSize == 1 && oldData.containsKey(type))) {
0358:                            data = Collections.singletonMap(type, ae);
0359:                            unifiedAgent = agent;
0360:                        } else {
0361:                            data = new HashMap(oldData);
0362:                            data.put(type, ae);
0363:                            data = Collections.unmodifiableMap(data);
0364:                            // figure out the "agent" owner, which is usually a mix of
0365:                            // null (for the MTS link) and a single agent (for the
0366:                            // clients within that agent).  If it's some other mix then
0367:                            // we'll generate a warning.
0368:                            String oldAgent = lease.agent;
0369:                            if (oldAgent == null) {
0370:                                unifiedAgent = agent;
0371:                            } else if (oldAgent.equals(agent) || agent == null) {
0372:                                unifiedAgent = oldAgent;
0373:                            } else {
0374:                                // conflict!
0375:                                unifiedAgent = agent;
0376:                                if (logger.isWarnEnabled()) {
0377:                                    logger.warn("Agent " + agent
0378:                                            + "'s lease request " + ae
0379:                                            + " is mixing with agent "
0380:                                            + oldAgent + "'s data for name="
0381:                                            + name + "=" + oldData
0382:                                            + ", assigning ownership to agent "
0383:                                            + agent);
0384:                                }
0385:                            }
0386:                        }
0387:                        UID uid = uidService.nextUID();
0388:                        record = new Record(uid, -1, data);
0389:                        lease = new ActiveLease(unifiedAgent, record);
0390:                        leases.put(name, lease);
0391:                        if (oldResponses != null) {
0392:                            lease.addResponses(oldResponses);
0393:                        }
0394:                        lease.addResponse(res);
0395:
0396:                        if (oldAE == null && logger.isInfoEnabled()) {
0397:                            logger.info("Binding new lease: " + lease);
0398:                        }
0399:
0400:                        bundle = asBundle(name, lease);
0401:                    }
0402:                }
0403:
0404:                if (cancelledResponses != null) {
0405:                    // cancel conflicting pending binds
0406:                    for (int i = 0, n = cancelledResponses.size(); i < n; i++) {
0407:                        Response cancelledRes = (Response) cancelledResponses
0408:                                .get(i);
0409:                        cancelledRes.setResult(req);
0410:                    }
0411:                }
0412:                if (0 < activeExpire) {
0413:                    // lease already bound
0414:                    res.setResult(new Long(activeExpire));
0415:                    return;
0416:                }
0417:
0418:                if (!createNewLease) {
0419:                    return;
0420:                }
0421:
0422:                // tell listeners
0423:                if (bundle != null) {
0424:                    List l = listeners.getUnmodifiableList();
0425:                    for (Iterator iter = l.iterator(); iter.hasNext();) {
0426:                        BundleService.Client bsc = (BundleService.Client) iter
0427:                                .next();
0428:                        bsc.add(name, bundle);
0429:                    }
0430:                }
0431:
0432:                // send
0433:                Object o = record;
0434:                if (unifiedAgent != null) {
0435:                    o = new NameTag(unifiedAgent, o);
0436:                }
0437:                Map m = Collections.singletonMap(name, o);
0438:                modifyService.modify(m);
0439:            }
0440:
0441:            private void unbind(Response res, String agent, Request.Unbind req) {
0442:                boolean bind = false;
0443:
0444:                AddressEntry ae = req.getAddressEntry();
0445:                String name = ae.getName();
0446:                String type = ae.getType();
0447:
0448:                List cancelledResponses = null;
0449:
0450:                boolean createNewLease = false;
0451:                String unifiedAgent = null;
0452:
0453:                Record record = null;
0454:                Bundle bundle = null;
0455:
0456:                synchronized (leases) {
0457:                    AddressEntry oldAE = null;
0458:                    Map oldData = null;
0459:                    List oldResponses = null;
0460:
0461:                    ActiveLease lease = (ActiveLease) leases.get(name);
0462:                    if (lease != null) {
0463:                        // get the old entry
0464:                        record = lease.record;
0465:                        oldData = (Map) record.getData();
0466:                        oldAE = (oldData == null ? (null)
0467:                                : (AddressEntry) oldData.get(type));
0468:                    }
0469:
0470:                    if (lease == null) {
0471:                        // not bound, nothing to unbind
0472:                    } else if (ae.equals(oldAE)) {
0473:                        // found exact match
0474:                        // cancel any pending requests
0475:                        //
0476:                        // this can also be used to intentionally cancel a
0477:                        // pending local bind.
0478:                        cancelledResponses = lease.takeResponses(type);
0479:                        oldResponses = lease.takeResponses();
0480:                        createNewLease = true;
0481:                    } else {
0482:                        // was not bound
0483:                    }
0484:
0485:                    if (createNewLease) {
0486:                        // create a new lease and replace the old one
0487:                        if (logger.isInfoEnabled()) {
0488:                            logger.info("Unbinding entry " + ae + " in lease "
0489:                                    + lease);
0490:                        }
0491:
0492:                        Map data;
0493:                        if (oldData == null || oldData.isEmpty()) {
0494:                            if (bind) {
0495:                                data = Collections.singletonMap(type, ae);
0496:                            } else {
0497:                                data = Collections.EMPTY_MAP;
0498:                            }
0499:                        } else {
0500:                            data = new HashMap(oldData);
0501:                            if (bind) {
0502:                                data.put(type, ae);
0503:                            } else {
0504:                                data.remove(type);
0505:                            }
0506:                            data = Collections.unmodifiableMap(data);
0507:                        }
0508:                        unifiedAgent = (lease == null ? agent : lease.agent);
0509:                        UID uid = uidService.nextUID();
0510:                        record = new Record(uid, -1, data);
0511:                        if (!bind && oldData.size() == 1) {
0512:                            leases.remove(name);
0513:                            lease = null;
0514:                        } else {
0515:                            lease = new ActiveLease(unifiedAgent, record);
0516:                            leases.put(name, lease);
0517:                            if (oldResponses != null) {
0518:                                lease.addResponses(oldResponses);
0519:                            }
0520:                        }
0521:
0522:                        if (logger.isInfoEnabled()) {
0523:                            logger.info("New lease: " + lease);
0524:                        }
0525:
0526:                        bundle = asBundle(name, lease);
0527:                    }
0528:                }
0529:
0530:                if (cancelledResponses != null) {
0531:                    // cancel conflicting pending binds
0532:                    for (int i = 0, n = cancelledResponses.size(); i < n; i++) {
0533:                        Response cancelledRes = (Response) cancelledResponses
0534:                                .get(i);
0535:                        cancelledRes.setResult(req);
0536:                    }
0537:                }
0538:
0539:                if (!bind) {
0540:                    // it's not bound from the local lease's point of view
0541:                    //
0542:                    // if it's bound in the server without our knowledge,
0543:                    // the lease expiration will unbind it for us
0544:                    res.setResult(Boolean.TRUE);
0545:                }
0546:
0547:                if (!createNewLease) {
0548:                    return;
0549:                }
0550:
0551:                // tell listeners
0552:                if (bundle != null) {
0553:                    Map entries = bundle.getEntries();
0554:                    boolean rem = (entries == null || entries.isEmpty());
0555:                    List l = listeners.getUnmodifiableList();
0556:                    for (Iterator iter = l.iterator(); iter.hasNext();) {
0557:                        BundleService.Client bsc = (BundleService.Client) iter
0558:                                .next();
0559:                        if (rem) {
0560:                            bsc.remove(name, bundle);
0561:                        } else {
0562:                            bsc.add(name, bundle);
0563:                        }
0564:                    }
0565:                }
0566:
0567:                // send
0568:                Object o = record;
0569:                if (unifiedAgent != null) {
0570:                    o = new NameTag(unifiedAgent, o);
0571:                }
0572:                Map m = Collections.singletonMap(name, o);
0573:                modifyService.modify(m);
0574:            }
0575:
0576:            private void modifyAnswer(long baseTime, Map m) {
0577:                for (Iterator iter = m.entrySet().iterator(); iter.hasNext();) {
0578:                    Map.Entry me = (Map.Entry) iter.next();
0579:                    String name = (String) me.getKey();
0580:                    Object value = me.getValue();
0581:                    if (value instanceof  Lease) {
0582:                        Lease l = (Lease) value;
0583:                        UID uid = l.getUID();
0584:                        long ttd = l.getTTD();
0585:                        leaseSuccess(name, uid, baseTime, ttd);
0586:                    } else if (value instanceof  LeaseNotKnown) {
0587:                        LeaseNotKnown lnk = (LeaseNotKnown) value;
0588:                        UID uid = lnk.getUID();
0589:                        leaseNotKnown(name, uid);
0590:                    } else if (value instanceof  LeaseDenied) {
0591:                        LeaseDenied ld = (LeaseDenied) value;
0592:                        UID uid = ld.getUID();
0593:                        Object reason = ld.getReason();
0594:                        leaseDenied(name, uid, reason);
0595:                    } else {
0596:                        if (logger.isErrorEnabled()) {
0597:                            logger.error("Unexpected modify answer: (baseTime="
0598:                                    + Timestamp.toString(baseTime)
0599:                                    + ", name="
0600:                                    + name
0601:                                    + ", value="
0602:                                    + (value == null ? "" : "("
0603:                                            + value.getClass().getName() + ")")
0604:                                    + value);
0605:                        }
0606:                    }
0607:                }
0608:            }
0609:
0610:            private boolean matchesLease(String name, UID uid,
0611:                    ActiveLease lease, String info) {
0612:
0613:                if (lease != null && uid.equals(lease.record.getUID())) {
0614:                    return true;
0615:                }
0616:
0617:                // ignore the response.
0618:                //
0619:                // The ModifyService usually protects us against this, but
0620:                // sometimes races can occur.
0621:                //
0622:                // if the lease is null:
0623:                //   either we never bound this entry (e.g. restart) or we've
0624:                //   recently unbound the entry and a stale ack/renewal has
0625:                //   raced back.
0626:                // or
0627:                //   we sent a bind followed by a replacement bind, and we're
0628:                //   waiting for the ack on that second bind.  This is similar
0629:                //   to the above "lease == null" race.
0630:                if (logger.isDebugEnabled()) {
0631:                    logger.debug("Ignoring a lease answer that we didn't send?"
0632:                            + " info=" + info + " name=" + name + " uid=" + uid
0633:                            + " lease=" + lease);
0634:                }
0635:                return false;
0636:            }
0637:
0638:            private void leaseSuccess(String name, UID uid, long baseTime,
0639:                    long ttd) {
0640:                List responses;
0641:                synchronized (leases) {
0642:                    ActiveLease lease = (ActiveLease) leases.get(name);
0643:                    if (!matchesLease(name, uid, lease, "success")) {
0644:                        return;
0645:                    }
0646:                    responses = leaseSuccess(lease, baseTime, ttd);
0647:                    if (responses == null || responses.isEmpty()) {
0648:                        return;
0649:                    }
0650:                }
0651:                Object result = new Long(baseTime + ttd);
0652:                for (int i = 0, n = responses.size(); i < n; i++) {
0653:                    Response res = (Response) responses.get(i);
0654:                    res.setResult(result);
0655:                }
0656:            }
0657:
0658:            private List leaseSuccess(ActiveLease lease, long baseTime, long ttd) {
0659:
0660:                long ttl = baseTime + ttd;
0661:
0662:                boolean renewal = (0 < lease.expireTime);
0663:
0664:                // good, we've created a new lease or
0665:                // renewed an existing lease
0666:                long now = System.currentTimeMillis();
0667:                renewed(lease, now, ttl);
0668:
0669:                if (renewal) {
0670:                    if (logger.isDebugEnabled()) {
0671:                        logger.debug("Renewed lease=" + lease);
0672:                    }
0673:                } else {
0674:                    if (logger.isInfoEnabled()) {
0675:                        logger.info("Established lease: " + lease);
0676:                    }
0677:                }
0678:
0679:                return lease.takeResponses();
0680:            }
0681:
0682:            private void leaseNotKnown(String name, UID uid) {
0683:                synchronized (leases) {
0684:                    ActiveLease lease = (ActiveLease) leases.get(name);
0685:                    if (!matchesLease(name, uid, lease, "not-known")) {
0686:                        return;
0687:                    }
0688:                    leaseNotKnown(lease, name, uid);
0689:                }
0690:                // we don't tell anyone, since we resend the uid-based
0691:                // renewal with a full record-based renewal
0692:            }
0693:
0694:            private void leaseNotKnown(ActiveLease lease, String name, UID uid) {
0695:
0696:                // send again, but this time send the full record instead
0697:                // of just the UID
0698:                String agent = lease.agent;
0699:                Record record = lease.record;
0700:
0701:                // FIXME what if record.ttd < 0?  shouldn't happen...
0702:
0703:                // FIXME tag lease?
0704:
0705:                if (logger.isDebugEnabled()) {
0706:                    logger.debug("Resending lease-not-known uid=" + uid
0707:                            + " for our active lease " + lease);
0708:                }
0709:
0710:                Object o = record;
0711:                if (agent != null) {
0712:                    o = new NameTag(agent, o);
0713:                }
0714:                Map m = Collections.singletonMap(name, o);
0715:                modifyService.modify(m);
0716:            }
0717:
0718:            private void leaseDenied(String name, UID uid, Object reason) {
0719:                List responses;
0720:                synchronized (leases) {
0721:                    ActiveLease lease = (ActiveLease) leases.get(name);
0722:                    if (!matchesLease(name, uid, lease, "denied")) {
0723:                        return;
0724:                    }
0725:                    responses = leaseDenied(lease, name, uid, reason);
0726:                    if (responses == null || responses.isEmpty()) {
0727:                        return;
0728:                    }
0729:                }
0730:                for (int i = 0, n = responses.size(); i < n; i++) {
0731:                    Response res = (Response) responses.get(i);
0732:                    res.getRequest();
0733:                    res.setResult(reason);
0734:                }
0735:            }
0736:
0737:            private List leaseDenied(ActiveLease lease, String name, UID uid,
0738:                    Object reason) {
0739:
0740:                // we've lost all our lease entries
0741:                //
0742:                // see the Record javadocs for future "bind-only" enhancements
0743:
0744:                leases.remove(name);
0745:
0746:                if (logger.isWarnEnabled()) {
0747:                    logger.warn("Lost lease "
0748:                            + ((0 < lease.expireTime) ? "renewal" : "creation")
0749:                            + " for " + "(name=" + name + " uid=" + uid
0750:                            + " reason=" + reason + "), dead lease is: "
0751:                            + lease);
0752:                }
0753:
0754:                // FIXME tell agent suicide watcher?
0755:
0756:                // we'll fail whatever's pending
0757:                return lease.takeResponses();
0758:            }
0759:
0760:            /**
0761:             * Check our leases and renew them if they'll expire soon.
0762:             */
0763:            private void renewLeases() {
0764:                long now;
0765:                Map m = null;
0766:                synchronized (leases) {
0767:                    now = System.currentTimeMillis();
0768:                    for (Iterator iter = leases.entrySet().iterator(); iter
0769:                            .hasNext();) {
0770:                        Map.Entry me = (Map.Entry) iter.next();
0771:                        String name = (String) me.getKey();
0772:                        ActiveLease lease = (ActiveLease) me.getValue();
0773:                        boolean renewNow = shouldRenew(name, lease, now);
0774:                        if (!renewNow) {
0775:                            continue;
0776:                        }
0777:                        String agent = lease.agent;
0778:                        UID uid = lease.record.getUID();
0779:                        if (m == null) {
0780:                            m = new HashMap();
0781:                        }
0782:                        Object o = uid;
0783:                        if (agent != null) {
0784:                            o = new NameTag(agent, o);
0785:                        }
0786:                        m.put(name, o);
0787:                    }
0788:                }
0789:
0790:                if (m != null) {
0791:                    modifyService.modify(m);
0792:                }
0793:
0794:                // run me again later
0795:                renewLeasesThread.schedule(config.checkLeasesPeriod);
0796:            }
0797:
0798:            //
0799:            // These are logically part of lease but require
0800:            // access to the config of the outter class
0801:            //
0802:
0803:            private boolean shouldRenew(String name, ActiveLease lease, long now) {
0804:                // calculate renewal time based upon:
0805:                //   expiration time
0806:                //   round-trip time for the last renewal delay
0807:                //   some slack for the above round-trip time
0808:                //   added safety in case the server forgets us
0809:                //
0810:                // here's the current guess:
0811:                //
0812:                // figure out the latest time we could renew
0813:                if (0 < lease.sendTime) {
0814:                    // we're still waiting for the last renewal ack
0815:                    if (logger.isDetailEnabled()) {
0816:                        logger.detail("lease (name=" + name + ", uid="
0817:                                + lease.record.getUID()
0818:                                + ") is still pending: " + lease.toString(now));
0819:                    }
0820:                    return false;
0821:                }
0822:                long latestRenew = lease.expireTime - lease.roundTripTime;
0823:                // weight it to be a little early
0824:                long renewalTime = (long) (lease.boundTime + (config.renewRatio * (latestRenew - lease.boundTime)));
0825:                // adjust for timer period
0826:                renewalTime -= config.checkLeasesPeriod;
0827:                if (now < renewalTime) {
0828:                    if (logger.isDetailEnabled()) {
0829:                        logger.detail("lease (name=" + name + ", uid="
0830:                                + lease.record.getUID()
0831:                                + ") doesn't need to be renewed until "
0832:                                + Timestamp.toString(renewalTime, now) + ": "
0833:                                + lease.toString(now));
0834:                    }
0835:                    return false;
0836:                }
0837:                // renew, mark the sendtime for round-trip measurement
0838:                lease.sendTime = now;
0839:                if (logger.isDebugEnabled()) {
0840:                    logger.debug("Renewing lease (name=" + name + ", uid="
0841:                            + lease.record.getUID() + ") that will expire at "
0842:                            + Timestamp.toString(lease.expireTime) + ": "
0843:                            + lease.toString(now));
0844:                }
0845:                return true;
0846:            }
0847:
0848:            private void renewed(ActiveLease lease, long now, long expTime) {
0849:                // set our timestamps
0850:                if (0 < lease.sendTime) {
0851:                    long tripTime = now - lease.sendTime;
0852:                    // soften this by averaging
0853:                    long weightedTripTime;
0854:                    if (lease.roundTripTime == 0) {
0855:                        // first time
0856:                        weightedTripTime = tripTime;
0857:                    } else {
0858:                        weightedTripTime = (long) (config.tripWeight * tripTime + (1.0 - config.tripWeight)
0859:                                * lease.roundTripTime);
0860:                    }
0861:                    lease.roundTripTime = weightedTripTime;
0862:                    lease.sendTime = 0;
0863:                } else {
0864:                    // we don't recall sending this renewal, but we'll
0865:                    // accept it anyways.
0866:                }
0867:                lease.boundTime = now;
0868:                // this expTime may be in the past, but it was a successful
0869:                // bind and our timer will renew it soon.
0870:                lease.expireTime = expTime;
0871:            }
0872:
0873:            /** config options */
0874:            private static class LeaserConfig {
0875:                public final double renewRatio;
0876:                public final double tripWeight;
0877:                public final long minBundleTTD;
0878:                public final long checkLeasesPeriod;
0879:
0880:                public LeaserConfig(Object o) {
0881:                    Parameters p = new Parameters(o,
0882:                            "org.cougaar.core.wp.resolver.lease.");
0883:                    renewRatio = p.getDouble("renewRatio", 0.75);
0884:                    tripWeight = p.getDouble("tripWeight", 0.75);
0885:                    minBundleTTD = p.getLong("minBundleTTD", 60000);
0886:                    checkLeasesPeriod = p.getLong("checkLeasesPeriod", 20000);
0887:                }
0888:            }
0889:
0890:            private static class ActiveLease {
0891:
0892:                private static final List NO_RESPONSES = Collections.EMPTY_LIST;
0893:
0894:                public final String agent;
0895:                public final Record record;
0896:
0897:                public final long bindTime;
0898:                public long boundTime;
0899:                public long sendTime;
0900:                public long roundTripTime;
0901:                public long expireTime;
0902:
0903:                private List responses;
0904:
0905:                public ActiveLease(String agent, Record record) {
0906:                    this .agent = agent;
0907:                    this .record = record;
0908:                    long now = System.currentTimeMillis();
0909:                    bindTime = now;
0910:                    boundTime = 0;
0911:                    sendTime = now;
0912:                    roundTripTime = 0;
0913:                    expireTime = 0;
0914:                    responses = NO_RESPONSES;
0915:                }
0916:
0917:                public String getAgent() {
0918:                    return agent;
0919:                }
0920:
0921:                public Record getRecord() {
0922:                    return record;
0923:                }
0924:
0925:                public boolean isBound() {
0926:                    return 0 < boundTime;
0927:                }
0928:
0929:                public long getExpirationTime() {
0930:                    return expireTime;
0931:                }
0932:
0933:                public void addResponses(List l) {
0934:                    for (int i = 0, n = l.size(); i < n; i++) {
0935:                        Response res = (Response) l.get(i);
0936:                        addResponse(res);
0937:                    }
0938:                }
0939:
0940:                public void addResponse(Response res) {
0941:                    if (!(res instanceof  Response.Bind)) {
0942:                        throw new IllegalArgumentException("Non-bind res: "
0943:                                + res);
0944:                    }
0945:                    if (isBound()) {
0946:                        throw new IllegalStateException(
0947:                                "Lease is already bound, not expecting responses: "
0948:                                        + this );
0949:                    }
0950:                    if (responses == NO_RESPONSES) {
0951:                        responses = new ArrayList(3);
0952:                    }
0953:                    responses.add(res);
0954:                }
0955:
0956:                public List takeResponses(String type) {
0957:                    if (type == null) {
0958:                        throw new IllegalArgumentException("null type");
0959:                    }
0960:                    List ret = NO_RESPONSES;
0961:                    for (int i = 0, n = responses.size(); i < n; i++) {
0962:                        Response res = (Response) responses.get(i);
0963:                        Request req = res.getRequest();
0964:                        AddressEntry ae;
0965:                        if (req instanceof  Request.Bind) {
0966:                            ae = ((Request.Bind) req).getAddressEntry();
0967:                        } else if (req instanceof  Request.Unbind) {
0968:                            ae = ((Request.Unbind) req).getAddressEntry();
0969:                        } else {
0970:                            // invalid?
0971:                            continue;
0972:                        }
0973:                        if (!type.equals(ae.getType())) {
0974:                            continue;
0975:                        }
0976:                        if (ret.isEmpty()) {
0977:                            ret = new ArrayList();
0978:                            ret.add(res);
0979:                        }
0980:                        responses.remove(i);
0981:                        --i;
0982:                        --n;
0983:                        if (n <= 0) {
0984:                            responses = NO_RESPONSES;
0985:                        }
0986:                    }
0987:                    return ret;
0988:                }
0989:
0990:                public List takeResponses() {
0991:                    List l = responses;
0992:                    responses = NO_RESPONSES;
0993:                    return l;
0994:                }
0995:
0996:                public String toString() {
0997:                    long now = System.currentTimeMillis();
0998:                    return toString(now);
0999:                }
1000:
1001:                public String toString(long now) {
1002:                    return "(lease" + " agent=" + agent + " record="
1003:                            + record.toString(bindTime, now) + " bindTime="
1004:                            + Timestamp.toString(bindTime, now) + " boundTime="
1005:                            + Timestamp.toString(boundTime, now) + " sendTime="
1006:                            + Timestamp.toString(sendTime, now)
1007:                            + " roundTripTime=" + roundTripTime
1008:                            + " expireTime="
1009:                            + Timestamp.toString(expireTime, now) + " pending["
1010:                            + responses.size() + "]=" + responses + ")";
1011:                }
1012:            }
1013:
1014:            private class LeaseSP implements  ServiceProvider {
1015:                private final LeaseService ls = new LeaseService() {
1016:                    public void submit(Response res, String agent) {
1017:                        LeaseManager.this .submit(res, agent);
1018:                    }
1019:                };
1020:
1021:                public Object getService(ServiceBroker sb, Object requestor,
1022:                        Class serviceClass) {
1023:                    if (!LeaseService.class.isAssignableFrom(serviceClass)) {
1024:                        return null;
1025:                    }
1026:                    return ls;
1027:                }
1028:
1029:                public void releaseService(ServiceBroker sb, Object requestor,
1030:                        Class serviceClass, Object service) {
1031:                }
1032:            }
1033:
1034:            private class BundleSP implements  ServiceProvider {
1035:                public Object getService(ServiceBroker sb, Object requestor,
1036:                        Class serviceClass) {
1037:                    if (!BundleService.class.isAssignableFrom(serviceClass)) {
1038:                        return null;
1039:                    }
1040:                    BundleService.Client client = (requestor instanceof  BundleService.Client ? (BundleService.Client) requestor
1041:                            : null);
1042:                    BundleServiceImpl rsi = new BundleServiceImpl(client);
1043:                    if (client != null) {
1044:                        LeaseManager.this .register(client);
1045:                    }
1046:                    return rsi;
1047:                }
1048:
1049:                public void releaseService(ServiceBroker sb, Object requestor,
1050:                        Class serviceClass, Object service) {
1051:                    if (!(service instanceof  BundleServiceImpl)) {
1052:                        return;
1053:                    }
1054:                    BundleServiceImpl rsi = (BundleServiceImpl) service;
1055:                    BundleService.Client client = rsi.client;
1056:                    if (client != null) {
1057:                        LeaseManager.this .unregister(client);
1058:                    }
1059:                }
1060:
1061:                private class BundleServiceImpl implements  BundleService {
1062:                    protected final Client client;
1063:
1064:                    public BundleServiceImpl(Client client) {
1065:                        this .client = client;
1066:                    }
1067:
1068:                    public Bundle getBundle(String name) {
1069:                        return LeaseManager.this .getBundle(name);
1070:                    }
1071:
1072:                    public Map getAllBundles() {
1073:                        return LeaseManager.this.getAllBundles();
1074:                    }
1075:                }
1076:            }
1077:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.