0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 1997-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.wp.server;
0028:
0029: import java.net.URI;
0030: import java.util.Arrays;
0031: import java.util.Collection;
0032: import java.util.Collections;
0033: import java.util.HashMap;
0034: import java.util.HashSet;
0035: import java.util.Iterator;
0036: import java.util.Map;
0037: import java.util.Set;
0038: import org.cougaar.core.component.Component;
0039: import org.cougaar.core.component.ServiceBroker;
0040: import org.cougaar.core.component.ServiceRevokedListener;
0041: import org.cougaar.core.mts.MessageAddress;
0042: import org.cougaar.core.service.LoggingService;
0043: import org.cougaar.core.service.ThreadService;
0044: import org.cougaar.core.service.UIDService;
0045: import org.cougaar.core.service.wp.AddressEntry;
0046: import org.cougaar.core.service.wp.WhitePagesProtectionService;
0047: import org.cougaar.core.thread.Schedulable;
0048: import org.cougaar.core.util.UID;
0049: import org.cougaar.core.wp.Parameters;
0050: import org.cougaar.core.wp.Timestamp;
0051: import org.cougaar.core.wp.resolver.Lease;
0052: import org.cougaar.core.wp.resolver.LeaseDenied;
0053: import org.cougaar.core.wp.resolver.LeaseNotKnown;
0054: import org.cougaar.core.wp.resolver.NameTag;
0055: import org.cougaar.core.wp.resolver.Record;
0056: import org.cougaar.core.wp.resolver.RecordIsValid;
0057: import org.cougaar.util.GenericStateModelAdapter;
0058:
0059: /**
0060: * This component is the white pages server implementation.
0061: * <p>
0062: * This implementation supports replication but not naming
0063: * hierarchies.
0064: * <p>
0065: * Refactor me!
0066: */
0067: public class RootAuthority extends GenericStateModelAdapter implements
0068: Component {
0069: private static final int LOOKUP = 0;
0070: private static final int MODIFY = 1;
0071: private static final int FORWARD = 2;
0072: private static final int PING = 3;
0073: private static final int FORWARD_ANSWER = 4;
0074:
0075: private RootConfig config;
0076:
0077: private ServiceBroker sb;
0078: private LoggingService logger;
0079: private ThreadService threadService;
0080: private UIDService uidService;
0081: private WhitePagesProtectionService protectS;
0082:
0083: private PingAckService pingAckService;
0084: private LookupAckService lookupAckService;
0085: private ModifyAckService modifyAckService;
0086: private ForwardAckService forwardAckService;
0087: private ForwardService forwardService;
0088:
0089: private Schedulable expireThread;
0090: private Schedulable forwardThread;
0091:
0092: private final MyClient myClient = new MyClient();
0093:
0094: private final Object lock = new Object();
0095:
0096: private DirEntry rootDir;
0097:
0098: private final Map forwardQueue = new HashMap();
0099:
0100: public void setParameter(Object o) {
0101: this .config = new RootConfig(o);
0102: }
0103:
0104: private void configure(Object o) {
0105: if (config != null) {
0106: return;
0107: }
0108: config = new RootConfig(o);
0109: }
0110:
0111: public void setServiceBroker(ServiceBroker sb) {
0112: this .sb = sb;
0113: }
0114:
0115: public void setLoggingService(LoggingService logger) {
0116: this .logger = logger;
0117: }
0118:
0119: public void setThreadService(ThreadService threadService) {
0120: this .threadService = threadService;
0121: }
0122:
0123: public void setUIDService(UIDService uidService) {
0124: this .uidService = uidService;
0125: }
0126:
0127: public void load() {
0128: super .load();
0129:
0130: configure(null);
0131:
0132: if (logger.isDebugEnabled()) {
0133: logger.debug("Loading server root authority");
0134: }
0135:
0136: protectS = (WhitePagesProtectionService) sb.getService(this ,
0137: WhitePagesProtectionService.class, null);
0138: if (logger.isDebugEnabled()) {
0139: logger.debug((protectS == null ? "Didn't find" : "Found")
0140: + " white pages protection service");
0141: }
0142:
0143: // create forward timer
0144: Runnable forwardRunner = new Runnable() {
0145: public void run() {
0146: // assert thread == forwardThread;
0147: forwardNow();
0148: }
0149: };
0150: forwardThread = threadService.getThread(this , forwardRunner,
0151: "White pages server forward leases");
0152: forwardThread.schedule(config.forwardPeriod);
0153:
0154: // create expiration timer
0155: Runnable expireRunner = new Runnable() {
0156: public void run() {
0157: // assert thread == expireThread;
0158: expireLeases();
0159: }
0160: };
0161: expireThread = threadService.getThread(this , expireRunner,
0162: "White pages server expiration checker");
0163: expireThread.schedule(config.checkExpirePeriod);
0164:
0165: // register for server-transport
0166: pingAckService = (PingAckService) sb.getService(myClient,
0167: PingAckService.class, null);
0168: lookupAckService = (LookupAckService) sb.getService(myClient,
0169: LookupAckService.class, null);
0170: modifyAckService = (ModifyAckService) sb.getService(myClient,
0171: ModifyAckService.class, null);
0172: forwardAckService = (ForwardAckService) sb.getService(myClient,
0173: ForwardAckService.class, null);
0174: forwardService = (ForwardService) sb.getService(myClient,
0175: ForwardService.class, null);
0176: String s = (pingAckService == null ? "PingAckService"
0177: : lookupAckService == null ? "LookupAckService"
0178: : modifyAckService == null ? "ModifyAckService"
0179: : forwardAckService == null ? "forwardAckService"
0180: : forwardService == null ? "forwardService"
0181: : null);
0182: if (s != null) {
0183: throw new RuntimeException("Unable to obtain " + s);
0184: }
0185: }
0186:
0187: public void unload() {
0188: expireThread.cancel();
0189:
0190: // release services
0191: if (forwardService != null) {
0192: sb.releaseService(myClient, ForwardService.class,
0193: forwardService);
0194: forwardService = null;
0195: }
0196: if (forwardAckService != null) {
0197: sb.releaseService(myClient, ForwardAckService.class,
0198: forwardAckService);
0199: forwardAckService = null;
0200: }
0201: if (modifyAckService != null) {
0202: sb.releaseService(myClient, ModifyAckService.class,
0203: modifyAckService);
0204: modifyAckService = null;
0205: }
0206: if (lookupAckService != null) {
0207: sb.releaseService(myClient, LookupAckService.class,
0208: lookupAckService);
0209: lookupAckService = null;
0210: }
0211: if (pingAckService != null) {
0212: sb.releaseService(myClient, PingAckService.class,
0213: pingAckService);
0214: pingAckService = null;
0215: }
0216: if (threadService != null) {
0217: sb.releaseService(this , ThreadService.class, threadService);
0218: threadService = null;
0219: }
0220: if (uidService != null) {
0221: sb.releaseService(this , UIDService.class, uidService);
0222: uidService = null;
0223: }
0224: if (logger != null) {
0225: sb.releaseService(this , LoggingService.class, logger);
0226: logger = null;
0227: }
0228:
0229: super .unload();
0230: }
0231:
0232: /**
0233: * Callback from on of our registered services:<ul>
0234: * <li>PingAckService: <i>PING</i></li>
0235: * <li>LookupAckService: <i>LOOKUP</i></li>
0236: * <li>ModifyAckService: <i>MODIFY</i></li>
0237: * <li>ForwardAckService: <i>FORWARD</i></li>
0238: * <li>ForwardService: <i>FORWARD_ANSWER</i></li>
0239: * </ul>
0240: */
0241: private void handleAll(int action, MessageAddress clientAddr,
0242: long clientTime, Map m) {
0243: if (action == PING) {
0244: // empty ping-ack
0245: pingAckService.pingAnswer(clientAddr, clientTime, null);
0246: return;
0247: }
0248:
0249: int n = (m == null ? 0 : m.size());
0250: if (n == 0) {
0251: return;
0252: }
0253: Map answers = null;
0254: synchronized (lock) {
0255: long now = System.currentTimeMillis();
0256: for (Iterator iter = m.entrySet().iterator(); iter
0257: .hasNext();) {
0258: Map.Entry me = (Map.Entry) iter.next();
0259: String name = (String) me.getKey();
0260: Object sendObj = me.getValue();
0261: Object answer = handle(action, name, sendObj, now);
0262: if (answer == null) {
0263: continue;
0264: }
0265: if (n == 1) {
0266: answers = Collections.singletonMap(name, answer);
0267: } else {
0268: if (answers == null) {
0269: answers = new HashMap();
0270: }
0271: answers.put(name, answer);
0272: }
0273: }
0274: }
0275: if (answers == null) {
0276: return;
0277: }
0278: // ugly switch, refactor me...
0279: switch (action) {
0280: case LOOKUP:
0281: lookupAckService.lookupAnswer(clientAddr, clientTime,
0282: answers);
0283: break;
0284: case MODIFY:
0285: modifyAckService.modifyAnswer(clientAddr, clientTime,
0286: answers);
0287: break;
0288: case FORWARD:
0289: forwardAckService.forwardAnswer(clientAddr, clientTime,
0290: answers);
0291: break;
0292: case FORWARD_ANSWER:
0293: long maxTTD = findMaxTTD(answers);
0294: forwardService.forward(clientAddr, answers, maxTTD);
0295: break;
0296: default:
0297: throw new IllegalArgumentException("Invalid action: "
0298: + action);
0299: }
0300: }
0301:
0302: private Object handle(int action, String name, Object sendObj,
0303: long now) {
0304: // assert (Thread.holdsLock(lock));
0305:
0306: // unwrap
0307: Object query = sendObj;
0308: if (sendObj instanceof NameTag) {
0309: NameTag nametag = (NameTag) sendObj;
0310: String agent = nametag.getName();
0311: Object obj = nametag.getObject();
0312: if (obj instanceof WhitePagesProtectionService.Wrapper) {
0313: WhitePagesProtectionService.Wrapper wrapper = (WhitePagesProtectionService.Wrapper) obj;
0314: try {
0315: if (protectS == null) {
0316: throw new RuntimeException(
0317: "No WhitePagesProtectionService");
0318: }
0319: query = protectS.unwrap(agent, wrapper);
0320: } catch (Exception e) {
0321: if (logger.isErrorEnabled()) {
0322: logger.error("Unable to unwrap (agent=" + agent
0323: + " name=" + name + " query=" + obj
0324: + ")", e);
0325: }
0326: query = null;
0327: }
0328: if (logger.isDetailEnabled()) {
0329: logger.detail("unwrapped " + sendObj + " to "
0330: + query);
0331: }
0332: }
0333: }
0334:
0335: switch (action) {
0336: case LOOKUP:
0337: return lookup(name, query, now);
0338: case MODIFY:
0339: return modifyAndForward(name, query, now);
0340: case FORWARD:
0341: return receiveForward(name, query, now);
0342: case FORWARD_ANSWER:
0343: return resendForward(name, query, now);
0344: default:
0345: throw new IllegalArgumentException("Invalid action: "
0346: + action);
0347: }
0348: }
0349:
0350: private Object lookup(String name, Object query, long now) {
0351: // assert (Thread.holdsLock(lock));
0352:
0353: // find the closest DirEntry if it exists
0354: DirEntry dir = findDir(name);
0355:
0356: // get the record-entry if this is a non-list operation
0357: RecordEntry rec = (dir == null ? (null) : dir
0358: .getRecordEntry(name));
0359:
0360: Object answer;
0361:
0362: UID queryUID;
0363: if (query == null) {
0364: queryUID = null;
0365: } else if (query instanceof UID) {
0366: queryUID = (UID) query;
0367: } else {
0368: // invalid
0369: queryUID = null;
0370: }
0371:
0372: boolean isList = (name.charAt(0) == '.');
0373:
0374: // find current record info
0375: UID uid;
0376: long ttd;
0377: Object data;
0378: if (isList) {
0379: if (dir == null) {
0380: // not listed, so data is null
0381: uid = uidService.nextUID();
0382: ttd = config.failTTD;
0383: data = null;
0384: } else {
0385: // copy dir keys
0386: //
0387: // we could make this a rarely-modified-set
0388: uid = dir.getUID();
0389: ttd = config.successTTD;
0390: Map entries = dir.getEntries();
0391: Set keys = entries.keySet();
0392: data = new HashSet(keys);
0393: }
0394: } else {
0395: if (rec == null) {
0396: // not listed, so data is null
0397: uid = uidService.nextUID();
0398: ttd = config.failTTD;
0399: data = null;
0400: } else {
0401: // return the data
0402: uid = rec.getUID();
0403: ttd = config.successTTD;
0404: data = rec.getData();
0405: }
0406: }
0407:
0408: if (queryUID != null && queryUID.equals(uid)) {
0409: // validated, so we don't send back the data
0410: answer = new RecordIsValid(uid, ttd);
0411: } else {
0412: // return full record
0413: answer = new Record(uid, ttd, data);
0414: }
0415:
0416: if (logger.isDetailEnabled()) {
0417: logger.detail("lookup (name=" + name + " query=" + query
0418: + " now=" + now + ") returning " + answer);
0419: }
0420:
0421: return answer;
0422: }
0423:
0424: private Object modify(String name, Object query, long ttd, long now) {
0425: // assert (Thread.holdsLock(lock));
0426:
0427: boolean isList = (name.charAt(0) == '.');
0428: if (isList) {
0429: // invalid modify
0430: if (logger.isErrorEnabled()) {
0431: logger.error("Modify (name=" + name + " query=" + query
0432: + ") is invalid, returning null");
0433: }
0434: return null;
0435: }
0436:
0437: // unwrap the query if it's wrapped
0438:
0439: Object queryContent = query;
0440: if (query instanceof NameTag) {
0441: queryContent = ((NameTag) query).getObject();
0442: }
0443:
0444: UID queryUID;
0445: boolean hasQueryData;
0446: Object queryData;
0447: if (queryContent instanceof UID) {
0448: queryUID = (UID) queryContent;
0449: hasQueryData = false;
0450: queryData = null;
0451: } else if (queryContent instanceof Record) {
0452: Record record = (Record) queryContent;
0453: queryUID = record.getUID();
0454: hasQueryData = true;
0455: queryData = record.getData();
0456: } else {
0457: // invalid
0458: queryUID = null;
0459: hasQueryData = false;
0460: queryData = null;
0461: }
0462: if (queryUID == null) {
0463: // invalid
0464: return null;
0465: }
0466:
0467: // find the closest DirEntry, create it if it doesn't exist
0468: DirEntry dir = findOrCreateDir(name);
0469: // assert (dir != null);
0470:
0471: // get the record-entry, which may be null
0472: RecordEntry rec = dir.getRecordEntry(name);
0473:
0474: UID uid = (rec == null ? null : rec.getUID());
0475: boolean sameUID = queryUID.equals(uid);
0476:
0477: Object answer = null;
0478: if (sameUID) {
0479: // successful lease renewal
0480: //
0481: // note that the client doesn't need to send the record
0482: // data to renew a lease.
0483: } else if (!hasQueryData) {
0484: // the UIDs don't match, which usually means that the
0485: // client thinks it's renewing data but the server
0486: // doesn't know the data.
0487: //
0488: // we need the full record to decide if we need to replace
0489: // our entry or deny this lease. Either we crashed, or the
0490: // client was talking to another server that hasn't
0491: // replicated that data to our server yet (e.g. due to a
0492: // crash or network partition), or some odd race condition
0493: // occurred.
0494: answer = new LeaseNotKnown(queryUID);
0495: } else if (rec == null) {
0496: // this is a new record and the client passed us the data,
0497: // so accept it.
0498: } else if (uid.getOwner().equals(queryUID.getOwner())) {
0499: // same author (node), so compare modification counters.
0500: if (uid.getId() <= queryUID.getId()) {
0501: // larger counter, so accept this update
0502: } else {
0503: // reject out-of-order update (should we simply ignore it?)
0504: Object reason = "Modify uid " + queryUID + " counter "
0505: + queryUID.getId()
0506: + " is less than the local uid " + uid
0507: + " counter " + uid.getId()
0508: + ", out of order update?";
0509: answer = new LeaseDenied(uid, reason, rec.getData());
0510: }
0511: } else {
0512: // deconflict records from different authors
0513: //
0514: // extract the optional "moveId" version fields
0515: Object data = rec.getData();
0516: long version = getVersion(data);
0517: long queryVersion = getVersion(queryData);
0518:
0519: if (version < queryVersion) {
0520: // accept the replacement record
0521: //
0522: // note that versions can be negative, e.g. use negative
0523: // timestamps to favor old bindings.
0524: } else if (version == 0 && queryVersion != 0) {
0525: // we always favor records with version numbers
0526: } else if (version == queryVersion) {
0527: // identical versions from different authors
0528: //
0529: // we need to compare *something* to prefer one of these
0530: // equivalent records. We can't simply favor our existing
0531: // record or the new record, since then we could never settle
0532: // conflicts between servers (e.g. races and mixed delivery
0533: // orders). We can't use virtual synchrony tricks and still
0534: // be fault tolerant.
0535: //
0536: // here we hash the UIDs and favor the larger value. We don't
0537: // use "UID.hashCode()", since it uses "+" and is biased by
0538: // authors and counters, so instead we use "^". This will seem
0539: // random to the clients but will behave identically when
0540: // performed in any order by servers peers.
0541: int h1 = uid.getOwner().hashCode() ^ (int) uid.getId();
0542: int h2 = queryUID.getOwner().hashCode()
0543: ^ (int) queryUID.getId();
0544: if (h2 < h1) {
0545: Object reason = "Modify uid " + queryUID + " hash "
0546: + h2 + " is less than the local uid " + uid
0547: + " hash " + h1;
0548: answer = new LeaseDenied(uid, reason, data);
0549: }
0550: } else {
0551: // old version
0552: Object reason = "Modify version " + queryVersion
0553: + " is greater than the local version "
0554: + version;
0555: answer = new LeaseDenied(uid, reason, data);
0556: }
0557: }
0558:
0559: if (answer != null) {
0560: // lease is either not known or denied
0561: if (logger.isDetailEnabled()) {
0562: logger.detail("modify (name=" + name + " query="
0563: + query + ") returning " + answer);
0564: }
0565: return answer;
0566: }
0567:
0568: //
0569: // create or extend a lease
0570: //
0571:
0572: long ttl = now + ttd;
0573:
0574: if (sameUID) {
0575: // extend an existing lease
0576: rec.setTTL(ttl);
0577: } else if (queryData == null
0578: || (queryData instanceof Map && ((Map) queryData)
0579: .isEmpty())) {
0580: // this is a full unbind
0581: if (rec == null) {
0582: // this is an odd case, where the client is telling
0583: // the server to unbind all its entries and the server
0584: // never heard of the client. In this case the
0585: // client probably doesn't care what the server returns,
0586: // since it's already discarded its entries. Still,
0587: // we must respond somehow...
0588: } else {
0589: Map entries = dir.getEntries();
0590: entries.remove(name);
0591: // bump dir uid to reflect the removed entry
0592: //
0593: // this allows "list" uid-based cache validation
0594: dir.setUID(uidService.nextUID());
0595: }
0596: } else {
0597: if (rec == null) {
0598: // create the record
0599: rec = new RecordEntry(queryUID);
0600: Map entries = dir.getEntries();
0601: entries.put(name, rec);
0602: // bump dir uid to reflect the added entry
0603: //
0604: // this allows "list" uid-based cache validation
0605: dir.setUID(uidService.nextUID());
0606: }
0607: rec.setUID(queryUID);
0608: rec.setTTL(ttl);
0609: rec.setData(queryData);
0610: }
0611:
0612: answer = new Lease(queryUID, ttd);
0613:
0614: if (logger.isDetailEnabled()) {
0615: logger.detail("modify (name=" + name + " query=" + query
0616: + ") returning " + answer);
0617: }
0618:
0619: return answer;
0620: }
0621:
0622: private Object modifyAndForward(String name, Object query, long now) {
0623: // assert (Thread.holdsLock(lock));
0624: Object answer = modify(name, query, config.expireTTD, now);
0625:
0626: if (answer instanceof Lease) {
0627: // forward lease to peers (excluding self and sender)
0628: //
0629: // note that the query can be a UID or a Record. If a UID
0630: // is sent and a peer doesn't know the UID, then that peer
0631: // will send us a "forwardAnswer" with a LeaseNotKnown.
0632: Lease lease = (Lease) answer;
0633: Object queryContent = (query instanceof NameTag ? ((NameTag) query)
0634: .getObject()
0635: : query);
0636: Record record = (queryContent instanceof Record ? ((Record) queryContent)
0637: : null);
0638: Forward fwd = new Forward(lease, record);
0639: // to all
0640: forwardLater(name, fwd);
0641: }
0642:
0643: return answer;
0644: }
0645:
0646: private Object receiveForward(String name, Object query, long now) {
0647: // assert (Thread.holdsLock(lock));
0648:
0649: if (!(query instanceof Forward)) {
0650: // invalid
0651: if (logger.isErrorEnabled()) {
0652: logger.error("Invalid forward (name=" + name
0653: + ", query=" + query + ")");
0654: }
0655: return null;
0656: }
0657:
0658: Forward fwd = (Forward) query;
0659: Lease lease = (Lease) fwd.getLease();
0660: Record record = (Record) fwd.getRecord();
0661: Object modQuery;
0662: if (record == null) {
0663: modQuery = lease.getUID();
0664: } else {
0665: modQuery = record;
0666: }
0667: long modTTD = lease.getTTD();
0668:
0669: // warn if our config.expireTTD is << the modTTD ?
0670: //
0671: // for consistency we'll accept our peer's ttd, since the
0672: // client will renew based upon this ttd. If we use a shorter
0673: // ttd then the client will expire prematurely. In practice
0674: // we expect all the ttds to be equal.
0675:
0676: Object answer = modify(name, modQuery, modTTD, now);
0677:
0678: // filter out leases (they've already been forwarded) and
0679: // denials (since they're likely a transient race condition)
0680: //
0681: // LeaseDenied responses are due to data conflicts. The
0682: // assumption is that our local data is better and either we
0683: // or another peer has already sent the better data to the
0684: // sender.
0685: if (!(answer instanceof LeaseNotKnown)) {
0686: return null;
0687: }
0688:
0689: // send back lease-not-known responses, since our peer
0690: // sent us a UID and we lack the data. The peer should
0691: // reply by forwarding the Record.
0692: return answer;
0693: }
0694:
0695: private Object resendForward(String name, Object query, long now) {
0696: // assert (Thread.holdsLock(lock));
0697:
0698: // find the lease and send the Record data
0699: //
0700: // this is similar to a "lookup" but we only want to find an
0701: // exact match, plus we need the lease ttl and not a lookup ttl
0702: UID queryUID = null;
0703: DirEntry dir = null;
0704: RecordEntry rec = null;
0705: UID uid = null;
0706: long ttd = -1;
0707: String denied = ((!(query instanceof LeaseNotKnown)) ? "query is not of type lease-not-known"
0708: : ((queryUID = ((LeaseNotKnown) query).getUID()) == null) ? "query uid is null"
0709: : (name.charAt(0) == '.') ? "name is invalid"
0710: : ((dir = findDir(name)) == null) ? "directory is null"
0711: : ((rec = dir
0712: .getRecordEntry(name)) == null) ? "no such record in our directory"
0713: : ((uid = rec.getUID()) == null) ? "local uid is null? "
0714: + rec
0715: : (!uid
0716: .equals(queryUID)) ? "our local record has a different uid "
0717: + rec
0718: : ((ttd = rec
0719: .getTTL()
0720: - now) <= 0) ? "our local record has expired"
0721: : (null));
0722: if (denied != null) {
0723: // our local table doesn't contain this entry
0724: //
0725: // the non-matching UID case is assumed to be a race between
0726: // a forward that we've sent and someone asking about the old
0727: // UID. Our forward should arrive soon enough.
0728: if (logger.isDebugEnabled()) {
0729: logger.debug("Ignoring resendForward for (name=" + name
0730: + ", query=" + query + "), " + denied);
0731: }
0732: return null;
0733: }
0734:
0735: Object data = rec.getData();
0736: Lease lease = new Lease(uid, ttd);
0737: Record record = new Record(uid, -1, data);
0738: Forward fwd = new Forward(lease, record);
0739:
0740: // okay, act as if we're forwarding it for the first time,
0741: // but instead of sending it later we can send it back
0742: // to the client.
0743: if (logger.isDebugEnabled()) {
0744: logger.debug("Resending forward: " + fwd);
0745: }
0746: return fwd;
0747: }
0748:
0749: private DirEntry findDir(String name) {
0750: return findOrCreateDir(name, false);
0751: }
0752:
0753: private DirEntry findOrCreateDir(String name) {
0754: return findOrCreateDir(name, true);
0755: }
0756:
0757: private DirEntry findOrCreateDir(String name, boolean create) {
0758:
0759: // extract the dir suffix, e.g.:
0760: // "." -> "."
0761: // "a" -> "."
0762: // "a." -> "."
0763: // "a.b" -> ".b"
0764: // "a.b." -> ".b"
0765: // "a.b.c" -> ".b.c"
0766: // ".d" -> ".d"
0767: // ".d." -> ".d"
0768: // ".d.e" -> ".d.e"
0769: String suffix;
0770: boolean isRoot;
0771: int firstDot = name.indexOf('.');
0772: if (firstDot < 0) {
0773: suffix = ".";
0774: isRoot = true;
0775: } else {
0776: if (firstDot == 0) {
0777: suffix = name;
0778: } else {
0779: suffix = name.substring(firstDot);
0780: }
0781: int n = suffix.length();
0782: if (n == 1) {
0783: suffix = ".";
0784: isRoot = true;
0785: } else {
0786: if (suffix.charAt(n - 1) == '.') {
0787: --n;
0788: }
0789: suffix = suffix.substring(0, n);
0790: isRoot = false;
0791: }
0792: }
0793: // assert (suffix.startsWith("."));
0794: // assert (suffix.equals(".") || !suffix.endsWith("."));
0795:
0796: if (rootDir == null) {
0797: UID uid = uidService.nextUID();
0798: rootDir = new DirEntry(uid);
0799: }
0800: DirEntry dir = rootDir;
0801:
0802: if (isRoot) {
0803: return dir;
0804: }
0805:
0806: // subdir, possibly deep
0807: int i = suffix.lastIndexOf('.');
0808: while (true) {
0809: String s = suffix.substring(i);
0810: Map entries = dir.getEntries();
0811: DirEntry subdir = (DirEntry) entries.get(s);
0812: if (subdir == null) {
0813: // no such dir
0814: if (!create) {
0815: dir = null;
0816: break;
0817: }
0818: dir.setUID(uidService.nextUID()); // bump dir uid
0819: subdir = new DirEntry(uidService.nextUID());
0820: entries.put(s, subdir);
0821: }
0822: // recurse down
0823: dir = subdir;
0824: if (i == 0) {
0825: // found dir
0826: break;
0827: }
0828: i = suffix.lastIndexOf('.', i - 1);
0829: // assert (0 <= i : "invalid suffix: "+suffix);
0830: }
0831:
0832: return dir;
0833: }
0834:
0835: /**
0836: * Given a map of AddressEntries, extract the "version"
0837: * entry's moveId.
0838: * <p>
0839: * The version entry format is:<pre>
0840: * version:///<i>incarnation</i>[/<i>moveId</i>]
0841: * </pre>
0842: * if the moveId is not specified then it's equivalent to the
0843: * incarnation number.
0844: *
0845: * @return zero if the data doesn't contain version information
0846: */
0847: private long getVersion(Object data) {
0848: if (!(data instanceof Map)) {
0849: return 0;
0850: }
0851: Map m = (Map) data;
0852: Object v = m.get("version");
0853: if (!(v instanceof AddressEntry)) {
0854: return 0;
0855: }
0856: AddressEntry ae = (AddressEntry) v;
0857: URI uri = ae.getURI();
0858: if (uri == null) {
0859: return 0;
0860: }
0861: String path = uri.getPath();
0862: if (path == null || path.length() < 1) {
0863: return 0;
0864: }
0865: int sepIdx = path.indexOf('/', 1);
0866: String s;
0867: if (sepIdx < 0) {
0868: s = path.substring(1);
0869: } else {
0870: s = path.substring(sepIdx + 1);
0871: }
0872: long ret;
0873: try {
0874: ret = Long.parseLong(s);
0875: } catch (NumberFormatException nfe) {
0876: return 0;
0877: }
0878: return ret;
0879: }
0880:
0881: /**
0882: * Scan a map of Forward objects to find the max lease ttd,
0883: * which we use to set the message timeout.
0884: */
0885: private long findMaxTTD(Map m) {
0886: long maxTTD = -1;
0887: for (Iterator iter = m.values().iterator(); iter.hasNext();) {
0888: Object o = iter.next();
0889: if (!(o instanceof Forward)) {
0890: continue;
0891: }
0892: Forward fwd = (Forward) o;
0893: Lease lease = fwd.getLease();
0894: long ttd = lease.getTTD();
0895: if (maxTTD < ttd) {
0896: maxTTD = ttd;
0897: }
0898: }
0899: return maxTTD;
0900: }
0901:
0902: /**
0903: * Batch forwards from ourself.
0904: * <p>
0905: * This is simply a performance optimization, since we can
0906: * batch our replications. We can't batch for too long
0907: * relative to our expireTTD, otherwise we might delay a lease
0908: * renewal past its expiration time and our peers will remove
0909: * it. About (0.75*expireTime - deliveryTime) is probably fine.
0910: */
0911: private void forwardLater(String name, Forward fwd) {
0912: // assert (Thread.holdsLock(lock));
0913:
0914: // if the queue already contains a forward with the same uid
0915: // then we should keep the record data of the old forward.
0916: // This occurs when we've queued both a new record (with data)
0917: // and a lease renewal (no data) -- we want to forward the
0918: // latest lease TTL with the record data, otherwise we won't
0919: // forward the data and our peers will complain about a
0920: // "lease-not-known".
0921: Forward newFwd = fwd;
0922: Forward oldFwd = (Forward) forwardQueue.get(name);
0923: if (oldFwd != null) {
0924: Lease lease = fwd.getLease();
0925: UID uid = lease.getUID();
0926: Lease oldLease = oldFwd.getLease();
0927: UID oldUID = oldLease.getUID();
0928: if (uid.equals(oldUID)) {
0929: Record record = fwd.getRecord();
0930: Record oldRecord = oldFwd.getRecord();
0931: if (record == null && oldRecord != null) {
0932: newFwd = new Forward(lease, oldRecord);
0933: }
0934: }
0935: }
0936:
0937: // assert (newFwd != null);
0938: forwardQueue.put(name, newFwd);
0939:
0940: // schedule the thread if it's not scheduled?
0941: //
0942: // the schedulable API makes this tricky, so we'll simply
0943: // keep a steady schedule
0944: }
0945:
0946: private void forwardNow() {
0947: // take the queue
0948: Map m;
0949: synchronized (lock) {
0950: if (forwardQueue.isEmpty()) {
0951: m = null;
0952: } else {
0953: m = new HashMap(forwardQueue);
0954: m = Collections.unmodifiableMap(m);
0955: forwardQueue.clear();
0956: }
0957: }
0958:
0959: if (m != null) {
0960: // find the max expire time for these forwards, so we
0961: // can set the message timeout
0962: long maxTTD = findMaxTTD(m);
0963: forwardService.forward(m, maxTTD);
0964: }
0965:
0966: // run me again later
0967: forwardThread.schedule(config.forwardPeriod);
0968: }
0969:
0970: /**
0971: * Find expired leases and remove them.
0972: *
0973: * @note recursive!
0974: */
0975: private boolean expireLeases(String suffix, DirEntry dir, long now) {
0976: // assert (Thread.holdsLock(lock));
0977:
0978: boolean hasChanged = false;
0979: Map entries = dir.getEntries();
0980: for (Iterator iter = entries.entrySet().iterator(); iter
0981: .hasNext();) {
0982: Map.Entry me = (Map.Entry) iter.next();
0983: String name = (String) me.getKey();
0984: Object value = me.getValue();
0985: if (value instanceof DirEntry) {
0986: // recurse!
0987: DirEntry subdir = (DirEntry) value;
0988: expireLeases(name, subdir, now);
0989: if (subdir.isEmpty()) {
0990: // all expired
0991: iter.remove();
0992: hasChanged = true;
0993: if (logger.isInfoEnabled()) {
0994: logger.info("Expired " + subdir.toString(now));
0995: }
0996: }
0997: } else if (value instanceof RecordEntry) {
0998: RecordEntry re = (RecordEntry) value;
0999: long ttl = re.getTTL();
1000: if (ttl < now) {
1001: // expired
1002: iter.remove();
1003: hasChanged = true;
1004: if (logger.isInfoEnabled()) {
1005: logger.info("Expired " + re.toString(now));
1006: }
1007: } else {
1008: // okay for now
1009: }
1010: } else {
1011: throw new RuntimeException(
1012: "Unexpected DirEntry element: (" + name + "="
1013: + value + ")");
1014: }
1015: }
1016:
1017: if (hasChanged && !dir.isEmpty()) {
1018: // changed the dir "list" contents, so we must change
1019: // the dir's uid.
1020: dir.setUID(uidService.nextUID());
1021: }
1022:
1023: return hasChanged;
1024: }
1025:
1026: private void expireLeases() {
1027: synchronized (lock) {
1028: long now = System.currentTimeMillis();
1029:
1030: DirEntry dir = findDir(".");
1031:
1032: if (logger.isDetailEnabled()) {
1033: StringBuffer buf = new StringBuffer();
1034: buf
1035: .append("##### server entries ##############################");
1036: dir.append(buf, ".", "\n ", now);
1037: buf
1038: .append("\n"
1039: + "###################################################");
1040: logger.detail(buf.toString());
1041: }
1042:
1043: expireLeases(".", dir, now);
1044: }
1045:
1046: // run me again later
1047: expireThread.schedule(config.checkExpirePeriod);
1048: }
1049:
1050: /** implement all the various client APIs */
1051: private class MyClient implements PingAckService.Client,
1052: LookupAckService.Client, ModifyAckService.Client,
1053: ForwardAckService.Client, ForwardService.Client {
1054: public void ping(MessageAddress clientAddr, long clientTime,
1055: Map m) {
1056: handleAll(PING, clientAddr, clientTime, m);
1057: }
1058:
1059: public void lookup(MessageAddress clientAddr, long clientTime,
1060: Map m) {
1061: handleAll(LOOKUP, clientAddr, clientTime, m);
1062: }
1063:
1064: public void modify(MessageAddress clientAddr, long clientTime,
1065: Map m) {
1066: handleAll(MODIFY, clientAddr, clientTime, m);
1067: }
1068:
1069: public void forward(MessageAddress clientAddr, long clientTime,
1070: Map m) {
1071: handleAll(FORWARD, clientAddr, clientTime, m);
1072: }
1073:
1074: public void forwardAnswer(MessageAddress clientAddr,
1075: long baseTime, Map m) {
1076: handleAll(FORWARD_ANSWER, clientAddr, baseTime, m);
1077: }
1078: }
1079:
1080: /** config options */
1081: private static class RootConfig {
1082: public final long successTTD;
1083: public final long failTTD;
1084: public final long expireTTD;
1085: public final long forwardPeriod;
1086: public final long checkExpirePeriod;
1087:
1088: public RootConfig(Object o) {
1089: Parameters p = new Parameters(o,
1090: "org.cougaar.core.wp.server.");
1091: successTTD = p.getLong("successTTD", 90000);
1092: failTTD = p.getLong("failTTD", 30000);
1093: expireTTD = p.getLong("expireTTD", 240000);
1094: forwardPeriod = p.getLong("forwardPeriod", 30000);
1095: checkExpirePeriod = p.getLong("checkExpirePeriod", 30000);
1096: }
1097: }
1098:
1099: private static abstract class Entry {
1100: private UID uid;
1101:
1102: public Entry(UID uid) {
1103: _setUID(uid);
1104: }
1105:
1106: private void _setUID(UID uid) {
1107: if (uid == null) {
1108: throw new IllegalArgumentException("null uid");
1109: }
1110: this .uid = uid;
1111: }
1112:
1113: public void setUID(UID uid) {
1114: _setUID(uid);
1115: }
1116:
1117: public UID getUID() {
1118: return uid;
1119: }
1120:
1121: public String toString() {
1122: long now = System.currentTimeMillis();
1123: return toString(now);
1124: }
1125:
1126: public abstract String toString(long now);
1127: }
1128:
1129: private static class DirEntry extends Entry {
1130:
1131: // the child entries, which can be a mix of
1132: // dir-entries and record-entries.
1133: //
1134: // The string key for dir-entries always start
1135: // with a '.', and record-entries never start
1136: // with a '.'.
1137: //
1138: // <String, Entry>
1139: private final Map entries = new HashMap();
1140:
1141: public DirEntry(UID uid) {
1142: super (uid);
1143: }
1144:
1145: public boolean isEmpty() {
1146: return entries.isEmpty();
1147: }
1148:
1149: public RecordEntry getRecordEntry(String name) {
1150: return ((name.charAt(0) == '.') ? (null)
1151: : (RecordEntry) entries.get(name));
1152: }
1153:
1154: // the client can directly modify this map
1155: public Map getEntries() {
1156: return entries;
1157: }
1158:
1159: public String toString(long now) {
1160: StringBuffer buf = new StringBuffer();
1161: append(buf, "?", "\n ", now);
1162: return buf.toString();
1163: }
1164:
1165: /** @note recursive! */
1166: public void append(StringBuffer buf, String suffix,
1167: String indent, long now) {
1168: // assert (Thread.holdsLock(lock));
1169: // assert (indent.startsWith("\n"));
1170:
1171: buf.append(indent).append("suffix=").append(suffix);
1172: buf.append(indent).append("uid=").append(getUID());
1173:
1174: Map m = this .getEntries();
1175:
1176: // sort
1177: Object[] keys = m.keySet().toArray();
1178: Arrays.sort(keys);
1179:
1180: buf.append(indent).append("entries[");
1181: buf.append(m.size()).append("]={");
1182:
1183: String subindent = indent + " ";
1184: for (int i = 0; i < keys.length; i++) {
1185: String name = (String) keys[i];
1186: Object value = m.get(name);
1187: if (value instanceof DirEntry) {
1188: // recurse!
1189: DirEntry subdir = (DirEntry) value;
1190: subdir.append(buf, name, subindent, now);
1191: } else if (value instanceof RecordEntry) {
1192: RecordEntry re = (RecordEntry) value;
1193: buf.append(subindent).append(name).append("=");
1194: buf.append(re.toString(now));
1195: } else {
1196: throw new RuntimeException(
1197: "Unexpected DirEntry element: (" + name
1198: + "=" + value + ")");
1199: }
1200: }
1201:
1202: buf.append(indent).append("}");
1203: }
1204: }
1205:
1206: private static class RecordEntry extends Entry {
1207:
1208: private long ttl;
1209: private Object data;
1210:
1211: public RecordEntry(UID uid) {
1212: super (uid);
1213: }
1214:
1215: public void setTTL(long ttl) {
1216: this .ttl = ttl;
1217: }
1218:
1219: public long getTTL() {
1220: return ttl;
1221: }
1222:
1223: public void setData(Object data) {
1224: this .data = data;
1225: }
1226:
1227: public Object getData() {
1228: return data;
1229: }
1230:
1231: public String toString(long now) {
1232: return "(record uid=" + getUID() + " ttl="
1233: + Timestamp.toString(ttl, now) + " data=" + data
1234: + ")";
1235: }
1236: }
1237: }
|