Source Code Cross Referenced for RootAuthority.java in  » Science » Cougaar12_4 » org » cougaar » core » wp » server » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.wp.server 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 1997-2004 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:
0027:        package org.cougaar.core.wp.server;
0028:
0029:        import java.net.URI;
0030:        import java.util.Arrays;
0031:        import java.util.Collection;
0032:        import java.util.Collections;
0033:        import java.util.HashMap;
0034:        import java.util.HashSet;
0035:        import java.util.Iterator;
0036:        import java.util.Map;
0037:        import java.util.Set;
0038:        import org.cougaar.core.component.Component;
0039:        import org.cougaar.core.component.ServiceBroker;
0040:        import org.cougaar.core.component.ServiceRevokedListener;
0041:        import org.cougaar.core.mts.MessageAddress;
0042:        import org.cougaar.core.service.LoggingService;
0043:        import org.cougaar.core.service.ThreadService;
0044:        import org.cougaar.core.service.UIDService;
0045:        import org.cougaar.core.service.wp.AddressEntry;
0046:        import org.cougaar.core.service.wp.WhitePagesProtectionService;
0047:        import org.cougaar.core.thread.Schedulable;
0048:        import org.cougaar.core.util.UID;
0049:        import org.cougaar.core.wp.Parameters;
0050:        import org.cougaar.core.wp.Timestamp;
0051:        import org.cougaar.core.wp.resolver.Lease;
0052:        import org.cougaar.core.wp.resolver.LeaseDenied;
0053:        import org.cougaar.core.wp.resolver.LeaseNotKnown;
0054:        import org.cougaar.core.wp.resolver.NameTag;
0055:        import org.cougaar.core.wp.resolver.Record;
0056:        import org.cougaar.core.wp.resolver.RecordIsValid;
0057:        import org.cougaar.util.GenericStateModelAdapter;
0058:
0059:        /**
0060:         * This component is the white pages server implementation.
0061:         * <p>
0062:         * This implementation supports replication but not naming
0063:         * hierarchies.
0064:         * <p>
0065:         * Refactor me!
0066:         */
0067:        public class RootAuthority extends GenericStateModelAdapter implements 
0068:                Component {
0069:            private static final int LOOKUP = 0;
0070:            private static final int MODIFY = 1;
0071:            private static final int FORWARD = 2;
0072:            private static final int PING = 3;
0073:            private static final int FORWARD_ANSWER = 4;
0074:
0075:            private RootConfig config;
0076:
0077:            private ServiceBroker sb;
0078:            private LoggingService logger;
0079:            private ThreadService threadService;
0080:            private UIDService uidService;
0081:            private WhitePagesProtectionService protectS;
0082:
0083:            private PingAckService pingAckService;
0084:            private LookupAckService lookupAckService;
0085:            private ModifyAckService modifyAckService;
0086:            private ForwardAckService forwardAckService;
0087:            private ForwardService forwardService;
0088:
0089:            private Schedulable expireThread;
0090:            private Schedulable forwardThread;
0091:
0092:            private final MyClient myClient = new MyClient();
0093:
0094:            private final Object lock = new Object();
0095:
0096:            private DirEntry rootDir;
0097:
0098:            private final Map forwardQueue = new HashMap();
0099:
0100:            public void setParameter(Object o) {
0101:                this .config = new RootConfig(o);
0102:            }
0103:
0104:            private void configure(Object o) {
0105:                if (config != null) {
0106:                    return;
0107:                }
0108:                config = new RootConfig(o);
0109:            }
0110:
0111:            public void setServiceBroker(ServiceBroker sb) {
0112:                this .sb = sb;
0113:            }
0114:
0115:            public void setLoggingService(LoggingService logger) {
0116:                this .logger = logger;
0117:            }
0118:
0119:            public void setThreadService(ThreadService threadService) {
0120:                this .threadService = threadService;
0121:            }
0122:
0123:            public void setUIDService(UIDService uidService) {
0124:                this .uidService = uidService;
0125:            }
0126:
0127:            public void load() {
0128:                super .load();
0129:
0130:                configure(null);
0131:
0132:                if (logger.isDebugEnabled()) {
0133:                    logger.debug("Loading server root authority");
0134:                }
0135:
0136:                protectS = (WhitePagesProtectionService) sb.getService(this ,
0137:                        WhitePagesProtectionService.class, null);
0138:                if (logger.isDebugEnabled()) {
0139:                    logger.debug((protectS == null ? "Didn't find" : "Found")
0140:                            + " white pages protection service");
0141:                }
0142:
0143:                // create forward timer
0144:                Runnable forwardRunner = new Runnable() {
0145:                    public void run() {
0146:                        // assert thread == forwardThread;
0147:                        forwardNow();
0148:                    }
0149:                };
0150:                forwardThread = threadService.getThread(this , forwardRunner,
0151:                        "White pages server forward leases");
0152:                forwardThread.schedule(config.forwardPeriod);
0153:
0154:                // create expiration timer
0155:                Runnable expireRunner = new Runnable() {
0156:                    public void run() {
0157:                        // assert thread == expireThread;
0158:                        expireLeases();
0159:                    }
0160:                };
0161:                expireThread = threadService.getThread(this , expireRunner,
0162:                        "White pages server expiration checker");
0163:                expireThread.schedule(config.checkExpirePeriod);
0164:
0165:                // register for server-transport
0166:                pingAckService = (PingAckService) sb.getService(myClient,
0167:                        PingAckService.class, null);
0168:                lookupAckService = (LookupAckService) sb.getService(myClient,
0169:                        LookupAckService.class, null);
0170:                modifyAckService = (ModifyAckService) sb.getService(myClient,
0171:                        ModifyAckService.class, null);
0172:                forwardAckService = (ForwardAckService) sb.getService(myClient,
0173:                        ForwardAckService.class, null);
0174:                forwardService = (ForwardService) sb.getService(myClient,
0175:                        ForwardService.class, null);
0176:                String s = (pingAckService == null ? "PingAckService"
0177:                        : lookupAckService == null ? "LookupAckService"
0178:                                : modifyAckService == null ? "ModifyAckService"
0179:                                        : forwardAckService == null ? "forwardAckService"
0180:                                                : forwardService == null ? "forwardService"
0181:                                                        : null);
0182:                if (s != null) {
0183:                    throw new RuntimeException("Unable to obtain " + s);
0184:                }
0185:            }
0186:
0187:            public void unload() {
0188:                expireThread.cancel();
0189:
0190:                // release services
0191:                if (forwardService != null) {
0192:                    sb.releaseService(myClient, ForwardService.class,
0193:                            forwardService);
0194:                    forwardService = null;
0195:                }
0196:                if (forwardAckService != null) {
0197:                    sb.releaseService(myClient, ForwardAckService.class,
0198:                            forwardAckService);
0199:                    forwardAckService = null;
0200:                }
0201:                if (modifyAckService != null) {
0202:                    sb.releaseService(myClient, ModifyAckService.class,
0203:                            modifyAckService);
0204:                    modifyAckService = null;
0205:                }
0206:                if (lookupAckService != null) {
0207:                    sb.releaseService(myClient, LookupAckService.class,
0208:                            lookupAckService);
0209:                    lookupAckService = null;
0210:                }
0211:                if (pingAckService != null) {
0212:                    sb.releaseService(myClient, PingAckService.class,
0213:                            pingAckService);
0214:                    pingAckService = null;
0215:                }
0216:                if (threadService != null) {
0217:                    sb.releaseService(this , ThreadService.class, threadService);
0218:                    threadService = null;
0219:                }
0220:                if (uidService != null) {
0221:                    sb.releaseService(this , UIDService.class, uidService);
0222:                    uidService = null;
0223:                }
0224:                if (logger != null) {
0225:                    sb.releaseService(this , LoggingService.class, logger);
0226:                    logger = null;
0227:                }
0228:
0229:                super .unload();
0230:            }
0231:
0232:            /**
0233:             * Callback from on of our registered services:<ul>
0234:             *   <li>PingAckService:    <i>PING</i></li> 
0235:             *   <li>LookupAckService:  <i>LOOKUP</i></li> 
0236:             *   <li>ModifyAckService:  <i>MODIFY</i></li> 
0237:             *   <li>ForwardAckService: <i>FORWARD</i></li> 
0238:             *   <li>ForwardService:    <i>FORWARD_ANSWER</i></li> 
0239:             * </ul>
0240:             */
0241:            private void handleAll(int action, MessageAddress clientAddr,
0242:                    long clientTime, Map m) {
0243:                if (action == PING) {
0244:                    // empty ping-ack
0245:                    pingAckService.pingAnswer(clientAddr, clientTime, null);
0246:                    return;
0247:                }
0248:
0249:                int n = (m == null ? 0 : m.size());
0250:                if (n == 0) {
0251:                    return;
0252:                }
0253:                Map answers = null;
0254:                synchronized (lock) {
0255:                    long now = System.currentTimeMillis();
0256:                    for (Iterator iter = m.entrySet().iterator(); iter
0257:                            .hasNext();) {
0258:                        Map.Entry me = (Map.Entry) iter.next();
0259:                        String name = (String) me.getKey();
0260:                        Object sendObj = me.getValue();
0261:                        Object answer = handle(action, name, sendObj, now);
0262:                        if (answer == null) {
0263:                            continue;
0264:                        }
0265:                        if (n == 1) {
0266:                            answers = Collections.singletonMap(name, answer);
0267:                        } else {
0268:                            if (answers == null) {
0269:                                answers = new HashMap();
0270:                            }
0271:                            answers.put(name, answer);
0272:                        }
0273:                    }
0274:                }
0275:                if (answers == null) {
0276:                    return;
0277:                }
0278:                // ugly switch, refactor me...
0279:                switch (action) {
0280:                case LOOKUP:
0281:                    lookupAckService.lookupAnswer(clientAddr, clientTime,
0282:                            answers);
0283:                    break;
0284:                case MODIFY:
0285:                    modifyAckService.modifyAnswer(clientAddr, clientTime,
0286:                            answers);
0287:                    break;
0288:                case FORWARD:
0289:                    forwardAckService.forwardAnswer(clientAddr, clientTime,
0290:                            answers);
0291:                    break;
0292:                case FORWARD_ANSWER:
0293:                    long maxTTD = findMaxTTD(answers);
0294:                    forwardService.forward(clientAddr, answers, maxTTD);
0295:                    break;
0296:                default:
0297:                    throw new IllegalArgumentException("Invalid action: "
0298:                            + action);
0299:                }
0300:            }
0301:
0302:            private Object handle(int action, String name, Object sendObj,
0303:                    long now) {
0304:                // assert (Thread.holdsLock(lock));
0305:
0306:                // unwrap
0307:                Object query = sendObj;
0308:                if (sendObj instanceof  NameTag) {
0309:                    NameTag nametag = (NameTag) sendObj;
0310:                    String agent = nametag.getName();
0311:                    Object obj = nametag.getObject();
0312:                    if (obj instanceof  WhitePagesProtectionService.Wrapper) {
0313:                        WhitePagesProtectionService.Wrapper wrapper = (WhitePagesProtectionService.Wrapper) obj;
0314:                        try {
0315:                            if (protectS == null) {
0316:                                throw new RuntimeException(
0317:                                        "No WhitePagesProtectionService");
0318:                            }
0319:                            query = protectS.unwrap(agent, wrapper);
0320:                        } catch (Exception e) {
0321:                            if (logger.isErrorEnabled()) {
0322:                                logger.error("Unable to unwrap (agent=" + agent
0323:                                        + " name=" + name + " query=" + obj
0324:                                        + ")", e);
0325:                            }
0326:                            query = null;
0327:                        }
0328:                        if (logger.isDetailEnabled()) {
0329:                            logger.detail("unwrapped " + sendObj + " to "
0330:                                    + query);
0331:                        }
0332:                    }
0333:                }
0334:
0335:                switch (action) {
0336:                case LOOKUP:
0337:                    return lookup(name, query, now);
0338:                case MODIFY:
0339:                    return modifyAndForward(name, query, now);
0340:                case FORWARD:
0341:                    return receiveForward(name, query, now);
0342:                case FORWARD_ANSWER:
0343:                    return resendForward(name, query, now);
0344:                default:
0345:                    throw new IllegalArgumentException("Invalid action: "
0346:                            + action);
0347:                }
0348:            }
0349:
0350:            private Object lookup(String name, Object query, long now) {
0351:                // assert (Thread.holdsLock(lock));
0352:
0353:                // find the closest DirEntry if it exists
0354:                DirEntry dir = findDir(name);
0355:
0356:                // get the record-entry if this is a non-list operation
0357:                RecordEntry rec = (dir == null ? (null) : dir
0358:                        .getRecordEntry(name));
0359:
0360:                Object answer;
0361:
0362:                UID queryUID;
0363:                if (query == null) {
0364:                    queryUID = null;
0365:                } else if (query instanceof  UID) {
0366:                    queryUID = (UID) query;
0367:                } else {
0368:                    // invalid
0369:                    queryUID = null;
0370:                }
0371:
0372:                boolean isList = (name.charAt(0) == '.');
0373:
0374:                // find current record info
0375:                UID uid;
0376:                long ttd;
0377:                Object data;
0378:                if (isList) {
0379:                    if (dir == null) {
0380:                        // not listed, so data is null
0381:                        uid = uidService.nextUID();
0382:                        ttd = config.failTTD;
0383:                        data = null;
0384:                    } else {
0385:                        // copy dir keys
0386:                        //
0387:                        // we could make this a rarely-modified-set
0388:                        uid = dir.getUID();
0389:                        ttd = config.successTTD;
0390:                        Map entries = dir.getEntries();
0391:                        Set keys = entries.keySet();
0392:                        data = new HashSet(keys);
0393:                    }
0394:                } else {
0395:                    if (rec == null) {
0396:                        // not listed, so data is null
0397:                        uid = uidService.nextUID();
0398:                        ttd = config.failTTD;
0399:                        data = null;
0400:                    } else {
0401:                        // return the data
0402:                        uid = rec.getUID();
0403:                        ttd = config.successTTD;
0404:                        data = rec.getData();
0405:                    }
0406:                }
0407:
0408:                if (queryUID != null && queryUID.equals(uid)) {
0409:                    // validated, so we don't send back the data
0410:                    answer = new RecordIsValid(uid, ttd);
0411:                } else {
0412:                    // return full record
0413:                    answer = new Record(uid, ttd, data);
0414:                }
0415:
0416:                if (logger.isDetailEnabled()) {
0417:                    logger.detail("lookup (name=" + name + " query=" + query
0418:                            + " now=" + now + ") returning " + answer);
0419:                }
0420:
0421:                return answer;
0422:            }
0423:
0424:            private Object modify(String name, Object query, long ttd, long now) {
0425:                // assert (Thread.holdsLock(lock));
0426:
0427:                boolean isList = (name.charAt(0) == '.');
0428:                if (isList) {
0429:                    // invalid modify
0430:                    if (logger.isErrorEnabled()) {
0431:                        logger.error("Modify (name=" + name + " query=" + query
0432:                                + ") is invalid, returning null");
0433:                    }
0434:                    return null;
0435:                }
0436:
0437:                // unwrap the query if it's wrapped
0438:
0439:                Object queryContent = query;
0440:                if (query instanceof  NameTag) {
0441:                    queryContent = ((NameTag) query).getObject();
0442:                }
0443:
0444:                UID queryUID;
0445:                boolean hasQueryData;
0446:                Object queryData;
0447:                if (queryContent instanceof  UID) {
0448:                    queryUID = (UID) queryContent;
0449:                    hasQueryData = false;
0450:                    queryData = null;
0451:                } else if (queryContent instanceof  Record) {
0452:                    Record record = (Record) queryContent;
0453:                    queryUID = record.getUID();
0454:                    hasQueryData = true;
0455:                    queryData = record.getData();
0456:                } else {
0457:                    // invalid
0458:                    queryUID = null;
0459:                    hasQueryData = false;
0460:                    queryData = null;
0461:                }
0462:                if (queryUID == null) {
0463:                    // invalid
0464:                    return null;
0465:                }
0466:
0467:                // find the closest DirEntry, create it if it doesn't exist
0468:                DirEntry dir = findOrCreateDir(name);
0469:                // assert (dir != null);
0470:
0471:                // get the record-entry, which may be null
0472:                RecordEntry rec = dir.getRecordEntry(name);
0473:
0474:                UID uid = (rec == null ? null : rec.getUID());
0475:                boolean sameUID = queryUID.equals(uid);
0476:
0477:                Object answer = null;
0478:                if (sameUID) {
0479:                    // successful lease renewal
0480:                    //
0481:                    // note that the client doesn't need to send the record
0482:                    // data to renew a lease.
0483:                } else if (!hasQueryData) {
0484:                    // the UIDs don't match, which usually means that the
0485:                    // client thinks it's renewing data but the server
0486:                    // doesn't know the data.
0487:                    //
0488:                    // we need the full record to decide if we need to replace
0489:                    // our entry or deny this lease.  Either we crashed, or the
0490:                    // client was talking to another server that hasn't
0491:                    // replicated that data to our server yet (e.g. due to a
0492:                    // crash or network partition), or some odd race condition
0493:                    // occurred.
0494:                    answer = new LeaseNotKnown(queryUID);
0495:                } else if (rec == null) {
0496:                    // this is a new record and the client passed us the data,
0497:                    // so accept it.
0498:                } else if (uid.getOwner().equals(queryUID.getOwner())) {
0499:                    // same author (node), so compare modification counters.
0500:                    if (uid.getId() <= queryUID.getId()) {
0501:                        // larger counter, so accept this update
0502:                    } else {
0503:                        // reject out-of-order update (should we simply ignore it?)
0504:                        Object reason = "Modify uid " + queryUID + " counter "
0505:                                + queryUID.getId()
0506:                                + " is less than the local uid " + uid
0507:                                + " counter " + uid.getId()
0508:                                + ", out of order update?";
0509:                        answer = new LeaseDenied(uid, reason, rec.getData());
0510:                    }
0511:                } else {
0512:                    // deconflict records from different authors
0513:                    //
0514:                    // extract the optional "moveId" version fields
0515:                    Object data = rec.getData();
0516:                    long version = getVersion(data);
0517:                    long queryVersion = getVersion(queryData);
0518:
0519:                    if (version < queryVersion) {
0520:                        // accept the replacement record
0521:                        //
0522:                        // note that versions can be negative, e.g. use negative
0523:                        // timestamps to favor old bindings. 
0524:                    } else if (version == 0 && queryVersion != 0) {
0525:                        // we always favor records with version numbers
0526:                    } else if (version == queryVersion) {
0527:                        // identical versions from different authors
0528:                        //
0529:                        // we need to compare *something* to prefer one of these
0530:                        // equivalent records.  We can't simply favor our existing
0531:                        // record or the new record, since then we could never settle
0532:                        // conflicts between servers (e.g. races and mixed delivery
0533:                        // orders).  We can't use virtual synchrony tricks and still
0534:                        // be fault tolerant.
0535:                        //
0536:                        // here we hash the UIDs and favor the larger value.  We don't
0537:                        // use "UID.hashCode()", since it uses "+" and is biased by
0538:                        // authors and counters, so instead we use "^".  This will seem
0539:                        // random to the clients but will behave identically when
0540:                        // performed in any order by servers peers.
0541:                        int h1 = uid.getOwner().hashCode() ^ (int) uid.getId();
0542:                        int h2 = queryUID.getOwner().hashCode()
0543:                                ^ (int) queryUID.getId();
0544:                        if (h2 < h1) {
0545:                            Object reason = "Modify uid " + queryUID + " hash "
0546:                                    + h2 + " is less than the local uid " + uid
0547:                                    + " hash " + h1;
0548:                            answer = new LeaseDenied(uid, reason, data);
0549:                        }
0550:                    } else {
0551:                        // old version
0552:                        Object reason = "Modify version " + queryVersion
0553:                                + " is greater than the local version "
0554:                                + version;
0555:                        answer = new LeaseDenied(uid, reason, data);
0556:                    }
0557:                }
0558:
0559:                if (answer != null) {
0560:                    // lease is either not known or denied
0561:                    if (logger.isDetailEnabled()) {
0562:                        logger.detail("modify (name=" + name + " query="
0563:                                + query + ") returning " + answer);
0564:                    }
0565:                    return answer;
0566:                }
0567:
0568:                //
0569:                // create or extend a lease
0570:                //
0571:
0572:                long ttl = now + ttd;
0573:
0574:                if (sameUID) {
0575:                    // extend an existing lease
0576:                    rec.setTTL(ttl);
0577:                } else if (queryData == null
0578:                        || (queryData instanceof  Map && ((Map) queryData)
0579:                                .isEmpty())) {
0580:                    // this is a full unbind
0581:                    if (rec == null) {
0582:                        // this is an odd case, where the client is telling
0583:                        // the server to unbind all its entries and the server
0584:                        // never heard of the client.  In this case the
0585:                        // client probably doesn't care what the server returns,
0586:                        // since it's already discarded its entries.  Still,
0587:                        // we must respond somehow...
0588:                    } else {
0589:                        Map entries = dir.getEntries();
0590:                        entries.remove(name);
0591:                        // bump dir uid to reflect the removed entry
0592:                        //
0593:                        // this allows "list" uid-based cache validation
0594:                        dir.setUID(uidService.nextUID());
0595:                    }
0596:                } else {
0597:                    if (rec == null) {
0598:                        // create the record
0599:                        rec = new RecordEntry(queryUID);
0600:                        Map entries = dir.getEntries();
0601:                        entries.put(name, rec);
0602:                        // bump dir uid to reflect the added entry
0603:                        //
0604:                        // this allows "list" uid-based cache validation
0605:                        dir.setUID(uidService.nextUID());
0606:                    }
0607:                    rec.setUID(queryUID);
0608:                    rec.setTTL(ttl);
0609:                    rec.setData(queryData);
0610:                }
0611:
0612:                answer = new Lease(queryUID, ttd);
0613:
0614:                if (logger.isDetailEnabled()) {
0615:                    logger.detail("modify (name=" + name + " query=" + query
0616:                            + ") returning " + answer);
0617:                }
0618:
0619:                return answer;
0620:            }
0621:
0622:            private Object modifyAndForward(String name, Object query, long now) {
0623:                // assert (Thread.holdsLock(lock));
0624:                Object answer = modify(name, query, config.expireTTD, now);
0625:
0626:                if (answer instanceof  Lease) {
0627:                    // forward lease to peers (excluding self and sender)
0628:                    //
0629:                    // note that the query can be a UID or a Record.  If a UID
0630:                    // is sent and a peer doesn't know the UID, then that peer
0631:                    // will send us a "forwardAnswer" with a LeaseNotKnown.
0632:                    Lease lease = (Lease) answer;
0633:                    Object queryContent = (query instanceof  NameTag ? ((NameTag) query)
0634:                            .getObject()
0635:                            : query);
0636:                    Record record = (queryContent instanceof  Record ? ((Record) queryContent)
0637:                            : null);
0638:                    Forward fwd = new Forward(lease, record);
0639:                    // to all
0640:                    forwardLater(name, fwd);
0641:                }
0642:
0643:                return answer;
0644:            }
0645:
0646:            private Object receiveForward(String name, Object query, long now) {
0647:                // assert (Thread.holdsLock(lock));
0648:
0649:                if (!(query instanceof  Forward)) {
0650:                    // invalid
0651:                    if (logger.isErrorEnabled()) {
0652:                        logger.error("Invalid forward (name=" + name
0653:                                + ", query=" + query + ")");
0654:                    }
0655:                    return null;
0656:                }
0657:
0658:                Forward fwd = (Forward) query;
0659:                Lease lease = (Lease) fwd.getLease();
0660:                Record record = (Record) fwd.getRecord();
0661:                Object modQuery;
0662:                if (record == null) {
0663:                    modQuery = lease.getUID();
0664:                } else {
0665:                    modQuery = record;
0666:                }
0667:                long modTTD = lease.getTTD();
0668:
0669:                // warn if our config.expireTTD is << the modTTD ?
0670:                //
0671:                // for consistency we'll accept our peer's ttd, since the
0672:                // client will renew based upon this ttd.  If we use a shorter
0673:                // ttd then the client will expire prematurely.  In practice
0674:                // we expect all the ttds to be equal.
0675:
0676:                Object answer = modify(name, modQuery, modTTD, now);
0677:
0678:                // filter out leases (they've already been forwarded) and
0679:                // denials (since they're likely a transient race condition)
0680:                //
0681:                // LeaseDenied responses are due to data conflicts.  The
0682:                // assumption is that our local data is better and either we
0683:                // or another peer has already sent the better data to the
0684:                // sender.
0685:                if (!(answer instanceof  LeaseNotKnown)) {
0686:                    return null;
0687:                }
0688:
0689:                // send back lease-not-known responses, since our peer
0690:                // sent us a UID and we lack the data.  The peer should
0691:                // reply by forwarding the Record.
0692:                return answer;
0693:            }
0694:
0695:            private Object resendForward(String name, Object query, long now) {
0696:                // assert (Thread.holdsLock(lock));
0697:
0698:                // find the lease and send the Record data
0699:                //
0700:                // this is similar to a "lookup" but we only want to find an
0701:                // exact match, plus we need the lease ttl and not a lookup ttl
0702:                UID queryUID = null;
0703:                DirEntry dir = null;
0704:                RecordEntry rec = null;
0705:                UID uid = null;
0706:                long ttd = -1;
0707:                String denied = ((!(query instanceof  LeaseNotKnown)) ? "query is not of type lease-not-known"
0708:                        : ((queryUID = ((LeaseNotKnown) query).getUID()) == null) ? "query uid is null"
0709:                                : (name.charAt(0) == '.') ? "name is invalid"
0710:                                        : ((dir = findDir(name)) == null) ? "directory is null"
0711:                                                : ((rec = dir
0712:                                                        .getRecordEntry(name)) == null) ? "no such record in our directory"
0713:                                                        : ((uid = rec.getUID()) == null) ? "local uid is null? "
0714:                                                                + rec
0715:                                                                : (!uid
0716:                                                                        .equals(queryUID)) ? "our local record has a different uid "
0717:                                                                        + rec
0718:                                                                        : ((ttd = rec
0719:                                                                                .getTTL()
0720:                                                                                - now) <= 0) ? "our local record has expired"
0721:                                                                                : (null));
0722:                if (denied != null) {
0723:                    // our local table doesn't contain this entry
0724:                    //
0725:                    // the non-matching UID case is assumed to be a race between
0726:                    // a forward that we've sent and someone asking about the old
0727:                    // UID.  Our forward should arrive soon enough.
0728:                    if (logger.isDebugEnabled()) {
0729:                        logger.debug("Ignoring resendForward for (name=" + name
0730:                                + ", query=" + query + "), " + denied);
0731:                    }
0732:                    return null;
0733:                }
0734:
0735:                Object data = rec.getData();
0736:                Lease lease = new Lease(uid, ttd);
0737:                Record record = new Record(uid, -1, data);
0738:                Forward fwd = new Forward(lease, record);
0739:
0740:                // okay, act as if we're forwarding it for the first time,
0741:                // but instead of sending it later we can send it back
0742:                // to the client.
0743:                if (logger.isDebugEnabled()) {
0744:                    logger.debug("Resending forward: " + fwd);
0745:                }
0746:                return fwd;
0747:            }
0748:
0749:            private DirEntry findDir(String name) {
0750:                return findOrCreateDir(name, false);
0751:            }
0752:
0753:            private DirEntry findOrCreateDir(String name) {
0754:                return findOrCreateDir(name, true);
0755:            }
0756:
0757:            private DirEntry findOrCreateDir(String name, boolean create) {
0758:
0759:                // extract the dir suffix, e.g.:
0760:                //   "."     -> "."
0761:                //   "a"     -> "."
0762:                //   "a."    -> "."
0763:                //   "a.b"   -> ".b"
0764:                //   "a.b."  -> ".b"
0765:                //   "a.b.c" -> ".b.c"
0766:                //   ".d"    -> ".d"
0767:                //   ".d."   -> ".d"
0768:                //   ".d.e"  -> ".d.e"
0769:                String suffix;
0770:                boolean isRoot;
0771:                int firstDot = name.indexOf('.');
0772:                if (firstDot < 0) {
0773:                    suffix = ".";
0774:                    isRoot = true;
0775:                } else {
0776:                    if (firstDot == 0) {
0777:                        suffix = name;
0778:                    } else {
0779:                        suffix = name.substring(firstDot);
0780:                    }
0781:                    int n = suffix.length();
0782:                    if (n == 1) {
0783:                        suffix = ".";
0784:                        isRoot = true;
0785:                    } else {
0786:                        if (suffix.charAt(n - 1) == '.') {
0787:                            --n;
0788:                        }
0789:                        suffix = suffix.substring(0, n);
0790:                        isRoot = false;
0791:                    }
0792:                }
0793:                // assert (suffix.startsWith("."));
0794:                // assert (suffix.equals(".") || !suffix.endsWith("."));
0795:
0796:                if (rootDir == null) {
0797:                    UID uid = uidService.nextUID();
0798:                    rootDir = new DirEntry(uid);
0799:                }
0800:                DirEntry dir = rootDir;
0801:
0802:                if (isRoot) {
0803:                    return dir;
0804:                }
0805:
0806:                // subdir, possibly deep
0807:                int i = suffix.lastIndexOf('.');
0808:                while (true) {
0809:                    String s = suffix.substring(i);
0810:                    Map entries = dir.getEntries();
0811:                    DirEntry subdir = (DirEntry) entries.get(s);
0812:                    if (subdir == null) {
0813:                        // no such dir
0814:                        if (!create) {
0815:                            dir = null;
0816:                            break;
0817:                        }
0818:                        dir.setUID(uidService.nextUID()); // bump dir uid
0819:                        subdir = new DirEntry(uidService.nextUID());
0820:                        entries.put(s, subdir);
0821:                    }
0822:                    // recurse down
0823:                    dir = subdir;
0824:                    if (i == 0) {
0825:                        // found dir
0826:                        break;
0827:                    }
0828:                    i = suffix.lastIndexOf('.', i - 1);
0829:                    // assert (0 <= i : "invalid suffix: "+suffix);
0830:                }
0831:
0832:                return dir;
0833:            }
0834:
0835:            /**
0836:             * Given a map of AddressEntries, extract the "version"
0837:             * entry's moveId.
0838:             * <p>
0839:             * The version entry format is:<pre>
0840:             *   version:///<i>incarnation</i>[/<i>moveId</i>]
0841:             * </pre>
0842:             * if the moveId is not specified then it's equivalent to the
0843:             * incarnation number.
0844:             *
0845:             * @return zero if the data doesn't contain version information 
0846:             */
0847:            private long getVersion(Object data) {
0848:                if (!(data instanceof  Map)) {
0849:                    return 0;
0850:                }
0851:                Map m = (Map) data;
0852:                Object v = m.get("version");
0853:                if (!(v instanceof  AddressEntry)) {
0854:                    return 0;
0855:                }
0856:                AddressEntry ae = (AddressEntry) v;
0857:                URI uri = ae.getURI();
0858:                if (uri == null) {
0859:                    return 0;
0860:                }
0861:                String path = uri.getPath();
0862:                if (path == null || path.length() < 1) {
0863:                    return 0;
0864:                }
0865:                int sepIdx = path.indexOf('/', 1);
0866:                String s;
0867:                if (sepIdx < 0) {
0868:                    s = path.substring(1);
0869:                } else {
0870:                    s = path.substring(sepIdx + 1);
0871:                }
0872:                long ret;
0873:                try {
0874:                    ret = Long.parseLong(s);
0875:                } catch (NumberFormatException nfe) {
0876:                    return 0;
0877:                }
0878:                return ret;
0879:            }
0880:
0881:            /**
0882:             * Scan a map of Forward objects to find the max lease ttd,
0883:             * which we use to set the message timeout.
0884:             */
0885:            private long findMaxTTD(Map m) {
0886:                long maxTTD = -1;
0887:                for (Iterator iter = m.values().iterator(); iter.hasNext();) {
0888:                    Object o = iter.next();
0889:                    if (!(o instanceof  Forward)) {
0890:                        continue;
0891:                    }
0892:                    Forward fwd = (Forward) o;
0893:                    Lease lease = fwd.getLease();
0894:                    long ttd = lease.getTTD();
0895:                    if (maxTTD < ttd) {
0896:                        maxTTD = ttd;
0897:                    }
0898:                }
0899:                return maxTTD;
0900:            }
0901:
0902:            /**
0903:             * Batch forwards from ourself.
0904:             * <p>
0905:             * This is simply a performance optimization, since we can
0906:             * batch our replications.  We can't batch for too long
0907:             * relative to our expireTTD, otherwise we might delay a lease
0908:             * renewal past its expiration time and our peers will remove
0909:             * it.  About (0.75*expireTime - deliveryTime) is probably fine.
0910:             */
0911:            private void forwardLater(String name, Forward fwd) {
0912:                // assert (Thread.holdsLock(lock));
0913:
0914:                // if the queue already contains a forward with the same uid
0915:                // then we should keep the record data of the old forward.
0916:                // This occurs when we've queued both a new record (with data)
0917:                // and a lease renewal (no data) -- we want to forward the
0918:                // latest lease TTL with the record data, otherwise we won't
0919:                // forward the data and our peers will complain about a
0920:                // "lease-not-known".
0921:                Forward newFwd = fwd;
0922:                Forward oldFwd = (Forward) forwardQueue.get(name);
0923:                if (oldFwd != null) {
0924:                    Lease lease = fwd.getLease();
0925:                    UID uid = lease.getUID();
0926:                    Lease oldLease = oldFwd.getLease();
0927:                    UID oldUID = oldLease.getUID();
0928:                    if (uid.equals(oldUID)) {
0929:                        Record record = fwd.getRecord();
0930:                        Record oldRecord = oldFwd.getRecord();
0931:                        if (record == null && oldRecord != null) {
0932:                            newFwd = new Forward(lease, oldRecord);
0933:                        }
0934:                    }
0935:                }
0936:
0937:                // assert (newFwd != null);
0938:                forwardQueue.put(name, newFwd);
0939:
0940:                // schedule the thread if it's not scheduled?
0941:                //
0942:                // the schedulable API makes this tricky, so we'll simply
0943:                // keep a steady schedule
0944:            }
0945:
0946:            private void forwardNow() {
0947:                // take the queue
0948:                Map m;
0949:                synchronized (lock) {
0950:                    if (forwardQueue.isEmpty()) {
0951:                        m = null;
0952:                    } else {
0953:                        m = new HashMap(forwardQueue);
0954:                        m = Collections.unmodifiableMap(m);
0955:                        forwardQueue.clear();
0956:                    }
0957:                }
0958:
0959:                if (m != null) {
0960:                    // find the max expire time for these forwards, so we
0961:                    // can set the message timeout
0962:                    long maxTTD = findMaxTTD(m);
0963:                    forwardService.forward(m, maxTTD);
0964:                }
0965:
0966:                // run me again later
0967:                forwardThread.schedule(config.forwardPeriod);
0968:            }
0969:
0970:            /**
0971:             * Find expired leases and remove them. 
0972:             *
0973:             * @note recursive!
0974:             */
0975:            private boolean expireLeases(String suffix, DirEntry dir, long now) {
0976:                // assert (Thread.holdsLock(lock));
0977:
0978:                boolean hasChanged = false;
0979:                Map entries = dir.getEntries();
0980:                for (Iterator iter = entries.entrySet().iterator(); iter
0981:                        .hasNext();) {
0982:                    Map.Entry me = (Map.Entry) iter.next();
0983:                    String name = (String) me.getKey();
0984:                    Object value = me.getValue();
0985:                    if (value instanceof  DirEntry) {
0986:                        // recurse!
0987:                        DirEntry subdir = (DirEntry) value;
0988:                        expireLeases(name, subdir, now);
0989:                        if (subdir.isEmpty()) {
0990:                            // all expired
0991:                            iter.remove();
0992:                            hasChanged = true;
0993:                            if (logger.isInfoEnabled()) {
0994:                                logger.info("Expired " + subdir.toString(now));
0995:                            }
0996:                        }
0997:                    } else if (value instanceof  RecordEntry) {
0998:                        RecordEntry re = (RecordEntry) value;
0999:                        long ttl = re.getTTL();
1000:                        if (ttl < now) {
1001:                            // expired
1002:                            iter.remove();
1003:                            hasChanged = true;
1004:                            if (logger.isInfoEnabled()) {
1005:                                logger.info("Expired " + re.toString(now));
1006:                            }
1007:                        } else {
1008:                            // okay for now
1009:                        }
1010:                    } else {
1011:                        throw new RuntimeException(
1012:                                "Unexpected DirEntry element: (" + name + "="
1013:                                        + value + ")");
1014:                    }
1015:                }
1016:
1017:                if (hasChanged && !dir.isEmpty()) {
1018:                    // changed the dir "list" contents, so we must change
1019:                    // the dir's uid.
1020:                    dir.setUID(uidService.nextUID());
1021:                }
1022:
1023:                return hasChanged;
1024:            }
1025:
1026:            private void expireLeases() {
1027:                synchronized (lock) {
1028:                    long now = System.currentTimeMillis();
1029:
1030:                    DirEntry dir = findDir(".");
1031:
1032:                    if (logger.isDetailEnabled()) {
1033:                        StringBuffer buf = new StringBuffer();
1034:                        buf
1035:                                .append("##### server entries ##############################");
1036:                        dir.append(buf, ".", "\n  ", now);
1037:                        buf
1038:                                .append("\n"
1039:                                        + "###################################################");
1040:                        logger.detail(buf.toString());
1041:                    }
1042:
1043:                    expireLeases(".", dir, now);
1044:                }
1045:
1046:                // run me again later
1047:                expireThread.schedule(config.checkExpirePeriod);
1048:            }
1049:
1050:            /** implement all the various client APIs */
1051:            private class MyClient implements  PingAckService.Client,
1052:                    LookupAckService.Client, ModifyAckService.Client,
1053:                    ForwardAckService.Client, ForwardService.Client {
1054:                public void ping(MessageAddress clientAddr, long clientTime,
1055:                        Map m) {
1056:                    handleAll(PING, clientAddr, clientTime, m);
1057:                }
1058:
1059:                public void lookup(MessageAddress clientAddr, long clientTime,
1060:                        Map m) {
1061:                    handleAll(LOOKUP, clientAddr, clientTime, m);
1062:                }
1063:
1064:                public void modify(MessageAddress clientAddr, long clientTime,
1065:                        Map m) {
1066:                    handleAll(MODIFY, clientAddr, clientTime, m);
1067:                }
1068:
1069:                public void forward(MessageAddress clientAddr, long clientTime,
1070:                        Map m) {
1071:                    handleAll(FORWARD, clientAddr, clientTime, m);
1072:                }
1073:
1074:                public void forwardAnswer(MessageAddress clientAddr,
1075:                        long baseTime, Map m) {
1076:                    handleAll(FORWARD_ANSWER, clientAddr, baseTime, m);
1077:                }
1078:            }
1079:
1080:            /** config options */
1081:            private static class RootConfig {
1082:                public final long successTTD;
1083:                public final long failTTD;
1084:                public final long expireTTD;
1085:                public final long forwardPeriod;
1086:                public final long checkExpirePeriod;
1087:
1088:                public RootConfig(Object o) {
1089:                    Parameters p = new Parameters(o,
1090:                            "org.cougaar.core.wp.server.");
1091:                    successTTD = p.getLong("successTTD", 90000);
1092:                    failTTD = p.getLong("failTTD", 30000);
1093:                    expireTTD = p.getLong("expireTTD", 240000);
1094:                    forwardPeriod = p.getLong("forwardPeriod", 30000);
1095:                    checkExpirePeriod = p.getLong("checkExpirePeriod", 30000);
1096:                }
1097:            }
1098:
1099:            private static abstract class Entry {
1100:                private UID uid;
1101:
1102:                public Entry(UID uid) {
1103:                    _setUID(uid);
1104:                }
1105:
1106:                private void _setUID(UID uid) {
1107:                    if (uid == null) {
1108:                        throw new IllegalArgumentException("null uid");
1109:                    }
1110:                    this .uid = uid;
1111:                }
1112:
1113:                public void setUID(UID uid) {
1114:                    _setUID(uid);
1115:                }
1116:
1117:                public UID getUID() {
1118:                    return uid;
1119:                }
1120:
1121:                public String toString() {
1122:                    long now = System.currentTimeMillis();
1123:                    return toString(now);
1124:                }
1125:
1126:                public abstract String toString(long now);
1127:            }
1128:
1129:            private static class DirEntry extends Entry {
1130:
1131:                // the child entries, which can be a mix of
1132:                // dir-entries and record-entries.
1133:                //
1134:                // The string key for dir-entries always start
1135:                // with a '.', and record-entries never start
1136:                // with a '.'.
1137:                //
1138:                // <String, Entry>
1139:                private final Map entries = new HashMap();
1140:
1141:                public DirEntry(UID uid) {
1142:                    super (uid);
1143:                }
1144:
1145:                public boolean isEmpty() {
1146:                    return entries.isEmpty();
1147:                }
1148:
1149:                public RecordEntry getRecordEntry(String name) {
1150:                    return ((name.charAt(0) == '.') ? (null)
1151:                            : (RecordEntry) entries.get(name));
1152:                }
1153:
1154:                // the client can directly modify this map
1155:                public Map getEntries() {
1156:                    return entries;
1157:                }
1158:
1159:                public String toString(long now) {
1160:                    StringBuffer buf = new StringBuffer();
1161:                    append(buf, "?", "\n  ", now);
1162:                    return buf.toString();
1163:                }
1164:
1165:                /** @note recursive! */
1166:                public void append(StringBuffer buf, String suffix,
1167:                        String indent, long now) {
1168:                    // assert (Thread.holdsLock(lock));
1169:                    // assert (indent.startsWith("\n"));
1170:
1171:                    buf.append(indent).append("suffix=").append(suffix);
1172:                    buf.append(indent).append("uid=").append(getUID());
1173:
1174:                    Map m = this .getEntries();
1175:
1176:                    // sort
1177:                    Object[] keys = m.keySet().toArray();
1178:                    Arrays.sort(keys);
1179:
1180:                    buf.append(indent).append("entries[");
1181:                    buf.append(m.size()).append("]={");
1182:
1183:                    String subindent = indent + "  ";
1184:                    for (int i = 0; i < keys.length; i++) {
1185:                        String name = (String) keys[i];
1186:                        Object value = m.get(name);
1187:                        if (value instanceof  DirEntry) {
1188:                            // recurse!
1189:                            DirEntry subdir = (DirEntry) value;
1190:                            subdir.append(buf, name, subindent, now);
1191:                        } else if (value instanceof  RecordEntry) {
1192:                            RecordEntry re = (RecordEntry) value;
1193:                            buf.append(subindent).append(name).append("=");
1194:                            buf.append(re.toString(now));
1195:                        } else {
1196:                            throw new RuntimeException(
1197:                                    "Unexpected DirEntry element: (" + name
1198:                                            + "=" + value + ")");
1199:                        }
1200:                    }
1201:
1202:                    buf.append(indent).append("}");
1203:                }
1204:            }
1205:
1206:            private static class RecordEntry extends Entry {
1207:
1208:                private long ttl;
1209:                private Object data;
1210:
1211:                public RecordEntry(UID uid) {
1212:                    super (uid);
1213:                }
1214:
1215:                public void setTTL(long ttl) {
1216:                    this .ttl = ttl;
1217:                }
1218:
1219:                public long getTTL() {
1220:                    return ttl;
1221:                }
1222:
1223:                public void setData(Object data) {
1224:                    this .data = data;
1225:                }
1226:
1227:                public Object getData() {
1228:                    return data;
1229:                }
1230:
1231:                public String toString(long now) {
1232:                    return "(record uid=" + getUID() + " ttl="
1233:                            + Timestamp.toString(ttl, now) + " data=" + data
1234:                            + ")";
1235:                }
1236:            }
1237:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.