0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 2002-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.wp.resolver;
0028:
0029: import java.util.ArrayList;
0030: import java.util.Collections;
0031: import java.util.HashMap;
0032: import java.util.Iterator;
0033: import java.util.List;
0034: import java.util.Map;
0035: import java.util.Set;
0036: import org.cougaar.core.component.Component;
0037: import org.cougaar.core.component.ServiceBroker;
0038: import org.cougaar.core.component.ServiceProvider;
0039: import org.cougaar.core.component.ServiceRevokedListener;
0040: import org.cougaar.core.mts.MessageAddress;
0041: import org.cougaar.core.node.NodeControlService;
0042: import org.cougaar.core.service.LoggingService;
0043: import org.cougaar.core.service.ThreadService;
0044: import org.cougaar.core.service.UIDService;
0045: import org.cougaar.core.service.wp.AddressEntry;
0046: import org.cougaar.core.service.wp.Request;
0047: import org.cougaar.core.service.wp.Response;
0048: import org.cougaar.core.thread.Schedulable;
0049: import org.cougaar.core.util.UID;
0050: import org.cougaar.core.wp.Parameters;
0051: import org.cougaar.core.wp.Timestamp;
0052: import org.cougaar.core.wp.bootstrap.Bundle;
0053: import org.cougaar.core.wp.bootstrap.BundleService;
0054: import org.cougaar.util.GenericStateModelAdapter;
0055: import org.cougaar.util.RarelyModifiedList;
0056:
0057: /**
0058: * This component watches for bind/unbind requests and maintains
0059: * the leases in the server.
0060: */
0061: public class LeaseManager extends GenericStateModelAdapter implements
0062: Component {
0063:
0064: private LeaserConfig config;
0065:
0066: // name -> ActiveLease
0067: // Map<String, ActiveLease>
0068: private final Map leases = new HashMap();
0069:
0070: private ServiceBroker sb;
0071: private ServiceBroker rootsb;
0072:
0073: private LoggingService logger;
0074: private ThreadService threadService;
0075: private UIDService uidService;
0076: private ModifyService modifyService;
0077:
0078: private final ModifyService.Client myClient = new ModifyService.Client() {
0079: public void modifyAnswer(long baseTime, Map m) {
0080: LeaseManager.this .modifyAnswer(baseTime, m);
0081: }
0082: };
0083:
0084: private LeaseSP leaseSP;
0085: private BundleSP bundleSP;
0086:
0087: private final RarelyModifiedList listeners = new RarelyModifiedList();
0088:
0089: //
0090: // renew leases:
0091: //
0092:
0093: private Schedulable renewLeasesThread;
0094:
0095: public void setParameter(Object o) {
0096: configure(o);
0097: }
0098:
0099: public void setServiceBroker(ServiceBroker sb) {
0100: this .sb = sb;
0101: }
0102:
0103: public void setLoggingService(LoggingService logger) {
0104: this .logger = logger;
0105: }
0106:
0107: public void setThreadService(ThreadService threadService) {
0108: this .threadService = threadService;
0109: }
0110:
0111: public void setUIDService(UIDService uidService) {
0112: this .uidService = uidService;
0113: }
0114:
0115: private void configure(Object o) {
0116: if (config != null) {
0117: return;
0118: }
0119: config = new LeaserConfig(o);
0120: }
0121:
0122: public void load() {
0123: super .load();
0124:
0125: configure(null);
0126:
0127: // register for lookups
0128: modifyService = (ModifyService) sb.getService(myClient,
0129: ModifyService.class, null);
0130: if (modifyService == null) {
0131: throw new RuntimeException("Unable to obtain ModifyService");
0132: }
0133:
0134: Runnable renewLeasesRunner = new Runnable() {
0135: public void run() {
0136: // assert (thread == renewLeasesThread);
0137: renewLeases();
0138: }
0139: };
0140: renewLeasesThread = threadService.getThread(this ,
0141: renewLeasesRunner, "White pages server renew leases");
0142: renewLeasesThread.schedule(config.checkLeasesPeriod);
0143:
0144: NodeControlService ncs = (NodeControlService) sb.getService(
0145: this , NodeControlService.class, null);
0146: if (ncs != null) {
0147: rootsb = ncs.getRootServiceBroker();
0148: sb.releaseService(this , NodeControlService.class, ncs);
0149: }
0150:
0151: // advertise our services
0152: leaseSP = new LeaseSP();
0153: sb.addService(LeaseService.class, leaseSP);
0154: bundleSP = new BundleSP();
0155: ServiceBroker bundleSB = (rootsb == null ? sb : rootsb);
0156: bundleSB.addService(BundleService.class, bundleSP);
0157: }
0158:
0159: public void unload() {
0160: // cancel existing scheduled threads
0161: renewLeasesThread.cancel();
0162:
0163: // release services
0164: if (bundleSP != null) {
0165: ServiceBroker bundleSB = (rootsb == null ? sb : rootsb);
0166: bundleSB.revokeService(BundleService.class, bundleSP);
0167: bundleSP = null;
0168: }
0169: if (leaseSP != null) {
0170: sb.revokeService(LeaseService.class, leaseSP);
0171: leaseSP = null;
0172: }
0173: if (modifyService != null) {
0174: sb.releaseService(myClient, ModifyService.class,
0175: modifyService);
0176: modifyService = null;
0177: }
0178: if (uidService != null) {
0179: sb.releaseService(this , UIDService.class, uidService);
0180: uidService = null;
0181: }
0182: if (threadService != null) {
0183: // halt our threads?
0184: sb.releaseService(this , ThreadService.class, threadService);
0185: threadService = null;
0186: }
0187: if (logger != null) {
0188: sb.releaseService(this , LoggingService.class, logger);
0189: logger = null;
0190: }
0191: super .unload();
0192: }
0193:
0194: private Bundle getBundle(String name) {
0195: synchronized (leases) {
0196: ActiveLease lease = (ActiveLease) leases.get(name);
0197: return asBundle(name, lease);
0198: }
0199: }
0200:
0201: private Map getAllBundles() {
0202: synchronized (leases) {
0203: Map ret = new HashMap(leases.size());
0204: for (Iterator iter = leases.entrySet().iterator(); iter
0205: .hasNext();) {
0206: Map.Entry me = (Map.Entry) iter.next();
0207: String name = (String) me.getKey();
0208: ActiveLease lease = (ActiveLease) me.getValue();
0209: Bundle bundle = asBundle(name, lease);
0210: if (bundle == null) {
0211: continue;
0212: }
0213: ret.put(name, bundle);
0214: }
0215: return ret;
0216: }
0217: }
0218:
0219: private Bundle asBundle(String name, ActiveLease lease) {
0220: if (lease == null) {
0221: return null;
0222: }
0223: Record record = lease.record;
0224: if (record == null) {
0225: return null;
0226: }
0227: UID uid = record.getUID();
0228: long ttd;
0229: if (lease.sendTime > 0) {
0230: // still pending
0231: ttd = config.minBundleTTD;
0232: } else {
0233: // use server ttd
0234: ttd = lease.expireTime - lease.boundTime;
0235: if (ttd < config.minBundleTTD) {
0236: ttd = config.minBundleTTD;
0237: }
0238: }
0239: Map entries = (Map) record.getData();
0240: Bundle bundle = new Bundle(name, uid, ttd, entries);
0241: return bundle;
0242: }
0243:
0244: private void register(BundleService.Client bsc) {
0245: listeners.add(bsc);
0246: bsc.addAll(getAllBundles());
0247: }
0248:
0249: private void unregister(BundleService.Client bsc) {
0250: listeners.remove(bsc);
0251: }
0252:
0253: private void submit(Response res, String agent) {
0254: // watch for bind/unbind
0255: Request req = res.getRequest();
0256: if (req instanceof Request.Bind) {
0257: if (req.hasOption(Request.CACHE_ONLY)) {
0258: res.setResult(new Long(Long.MAX_VALUE));
0259: } else {
0260: bind(res, agent, (Request.Bind) req);
0261: }
0262: } else if (req instanceof Request.Unbind) {
0263: if (req.hasOption(Request.CACHE_ONLY)) {
0264: res.setResult(Boolean.TRUE);
0265: } else {
0266: unbind(res, agent, (Request.Unbind) req);
0267: }
0268: } else {
0269: // ignore
0270: }
0271: }
0272:
0273: private void bind(Response res, String agent, Request.Bind req) {
0274: long activeExpire = -1;
0275: List cancelledResponses = null;
0276:
0277: boolean createNewLease = false;
0278: String unifiedAgent = null;
0279: Record record = null;
0280: Bundle bundle = null;
0281:
0282: AddressEntry ae = req.getAddressEntry();
0283: String name = ae.getName();
0284: String type = ae.getType();
0285:
0286: if (logger.isInfoEnabled() && !req.isOverWrite()) {
0287: logger.info("Warning: treating bind as rebind: " + req);
0288: }
0289:
0290: synchronized (leases) {
0291: AddressEntry oldAE = null;
0292: List oldResponses = null;
0293: Map oldData = null;
0294:
0295: ActiveLease lease = (ActiveLease) leases.get(name);
0296: if (lease != null) {
0297: // get the bound/pending lease's entry
0298: oldData = (Map) lease.record.getData();
0299: oldAE = (oldData == null ? (null)
0300: : (AddressEntry) oldData.get(type));
0301: }
0302:
0303: if (lease == null) {
0304: // new bind-pending lease
0305: createNewLease = true;
0306: } else if (ae.equals(oldAE)) {
0307: // the lease is active, so we know the status
0308: activeExpire = lease.getExpirationTime();
0309: if (logger.isDetailEnabled()) {
0310: logger.detail("lease already active: " + lease);
0311: }
0312: } else if (!lease.isBound() && ae.equals(oldAE)) {
0313: // already in progress, we don't know the
0314: // status yet, so batch with our pending request.
0315: lease.addResponse(res);
0316: if (logger.isDetailEnabled()) {
0317: logger.detail("lease rebind " + ae
0318: + " already in progress, batching: "
0319: + lease);
0320: }
0321: // batched here
0322: } else {
0323: // cancel old lease, initiate a new one
0324: //
0325: // Note that multiple pending binds may be in progress, so
0326: // we must handle these outstanding requests. There are
0327: // two options:
0328: // a) Cancel the outstanding requests and ignore the
0329: // WP's answers when they arrive
0330: // b) Send both asynchronously and tell the requests
0331: // their answers, even if they conflict.
0332: // We'll go with (a) and cancel the pending requests.
0333: // This is more consistent with the notion of a
0334: // client-side lease manager, since only one binding will
0335: // be maintained and conflicts are a client-side error.
0336: //
0337: // Unbind must do a similar cancel.
0338: //
0339: createNewLease = true;
0340: // clear the old responses for cancelling
0341: cancelledResponses = lease.takeResponses(type);
0342: // keep the rest
0343: oldResponses = lease.takeResponses();
0344: }
0345:
0346: if (createNewLease) {
0347: // create a new lease and replace the old one
0348: if (oldAE != null && logger.isInfoEnabled()) {
0349: logger.info("Binding replacement entry " + ae
0350: + " for current entry " + oldAE
0351: + " in lease " + lease);
0352: }
0353:
0354: Map data;
0355: int oldSize = (oldData == null ? 0 : oldData.size());
0356: if (oldSize == 0
0357: || (oldSize == 1 && oldData.containsKey(type))) {
0358: data = Collections.singletonMap(type, ae);
0359: unifiedAgent = agent;
0360: } else {
0361: data = new HashMap(oldData);
0362: data.put(type, ae);
0363: data = Collections.unmodifiableMap(data);
0364: // figure out the "agent" owner, which is usually a mix of
0365: // null (for the MTS link) and a single agent (for the
0366: // clients within that agent). If it's some other mix then
0367: // we'll generate a warning.
0368: String oldAgent = lease.agent;
0369: if (oldAgent == null) {
0370: unifiedAgent = agent;
0371: } else if (oldAgent.equals(agent) || agent == null) {
0372: unifiedAgent = oldAgent;
0373: } else {
0374: // conflict!
0375: unifiedAgent = agent;
0376: if (logger.isWarnEnabled()) {
0377: logger.warn("Agent " + agent
0378: + "'s lease request " + ae
0379: + " is mixing with agent "
0380: + oldAgent + "'s data for name="
0381: + name + "=" + oldData
0382: + ", assigning ownership to agent "
0383: + agent);
0384: }
0385: }
0386: }
0387: UID uid = uidService.nextUID();
0388: record = new Record(uid, -1, data);
0389: lease = new ActiveLease(unifiedAgent, record);
0390: leases.put(name, lease);
0391: if (oldResponses != null) {
0392: lease.addResponses(oldResponses);
0393: }
0394: lease.addResponse(res);
0395:
0396: if (oldAE == null && logger.isInfoEnabled()) {
0397: logger.info("Binding new lease: " + lease);
0398: }
0399:
0400: bundle = asBundle(name, lease);
0401: }
0402: }
0403:
0404: if (cancelledResponses != null) {
0405: // cancel conflicting pending binds
0406: for (int i = 0, n = cancelledResponses.size(); i < n; i++) {
0407: Response cancelledRes = (Response) cancelledResponses
0408: .get(i);
0409: cancelledRes.setResult(req);
0410: }
0411: }
0412: if (0 < activeExpire) {
0413: // lease already bound
0414: res.setResult(new Long(activeExpire));
0415: return;
0416: }
0417:
0418: if (!createNewLease) {
0419: return;
0420: }
0421:
0422: // tell listeners
0423: if (bundle != null) {
0424: List l = listeners.getUnmodifiableList();
0425: for (Iterator iter = l.iterator(); iter.hasNext();) {
0426: BundleService.Client bsc = (BundleService.Client) iter
0427: .next();
0428: bsc.add(name, bundle);
0429: }
0430: }
0431:
0432: // send
0433: Object o = record;
0434: if (unifiedAgent != null) {
0435: o = new NameTag(unifiedAgent, o);
0436: }
0437: Map m = Collections.singletonMap(name, o);
0438: modifyService.modify(m);
0439: }
0440:
0441: private void unbind(Response res, String agent, Request.Unbind req) {
0442: boolean bind = false;
0443:
0444: AddressEntry ae = req.getAddressEntry();
0445: String name = ae.getName();
0446: String type = ae.getType();
0447:
0448: List cancelledResponses = null;
0449:
0450: boolean createNewLease = false;
0451: String unifiedAgent = null;
0452:
0453: Record record = null;
0454: Bundle bundle = null;
0455:
0456: synchronized (leases) {
0457: AddressEntry oldAE = null;
0458: Map oldData = null;
0459: List oldResponses = null;
0460:
0461: ActiveLease lease = (ActiveLease) leases.get(name);
0462: if (lease != null) {
0463: // get the old entry
0464: record = lease.record;
0465: oldData = (Map) record.getData();
0466: oldAE = (oldData == null ? (null)
0467: : (AddressEntry) oldData.get(type));
0468: }
0469:
0470: if (lease == null) {
0471: // not bound, nothing to unbind
0472: } else if (ae.equals(oldAE)) {
0473: // found exact match
0474: // cancel any pending requests
0475: //
0476: // this can also be used to intentionally cancel a
0477: // pending local bind.
0478: cancelledResponses = lease.takeResponses(type);
0479: oldResponses = lease.takeResponses();
0480: createNewLease = true;
0481: } else {
0482: // was not bound
0483: }
0484:
0485: if (createNewLease) {
0486: // create a new lease and replace the old one
0487: if (logger.isInfoEnabled()) {
0488: logger.info("Unbinding entry " + ae + " in lease "
0489: + lease);
0490: }
0491:
0492: Map data;
0493: if (oldData == null || oldData.isEmpty()) {
0494: if (bind) {
0495: data = Collections.singletonMap(type, ae);
0496: } else {
0497: data = Collections.EMPTY_MAP;
0498: }
0499: } else {
0500: data = new HashMap(oldData);
0501: if (bind) {
0502: data.put(type, ae);
0503: } else {
0504: data.remove(type);
0505: }
0506: data = Collections.unmodifiableMap(data);
0507: }
0508: unifiedAgent = (lease == null ? agent : lease.agent);
0509: UID uid = uidService.nextUID();
0510: record = new Record(uid, -1, data);
0511: if (!bind && oldData.size() == 1) {
0512: leases.remove(name);
0513: lease = null;
0514: } else {
0515: lease = new ActiveLease(unifiedAgent, record);
0516: leases.put(name, lease);
0517: if (oldResponses != null) {
0518: lease.addResponses(oldResponses);
0519: }
0520: }
0521:
0522: if (logger.isInfoEnabled()) {
0523: logger.info("New lease: " + lease);
0524: }
0525:
0526: bundle = asBundle(name, lease);
0527: }
0528: }
0529:
0530: if (cancelledResponses != null) {
0531: // cancel conflicting pending binds
0532: for (int i = 0, n = cancelledResponses.size(); i < n; i++) {
0533: Response cancelledRes = (Response) cancelledResponses
0534: .get(i);
0535: cancelledRes.setResult(req);
0536: }
0537: }
0538:
0539: if (!bind) {
0540: // it's not bound from the local lease's point of view
0541: //
0542: // if it's bound in the server without our knowledge,
0543: // the lease expiration will unbind it for us
0544: res.setResult(Boolean.TRUE);
0545: }
0546:
0547: if (!createNewLease) {
0548: return;
0549: }
0550:
0551: // tell listeners
0552: if (bundle != null) {
0553: Map entries = bundle.getEntries();
0554: boolean rem = (entries == null || entries.isEmpty());
0555: List l = listeners.getUnmodifiableList();
0556: for (Iterator iter = l.iterator(); iter.hasNext();) {
0557: BundleService.Client bsc = (BundleService.Client) iter
0558: .next();
0559: if (rem) {
0560: bsc.remove(name, bundle);
0561: } else {
0562: bsc.add(name, bundle);
0563: }
0564: }
0565: }
0566:
0567: // send
0568: Object o = record;
0569: if (unifiedAgent != null) {
0570: o = new NameTag(unifiedAgent, o);
0571: }
0572: Map m = Collections.singletonMap(name, o);
0573: modifyService.modify(m);
0574: }
0575:
0576: private void modifyAnswer(long baseTime, Map m) {
0577: for (Iterator iter = m.entrySet().iterator(); iter.hasNext();) {
0578: Map.Entry me = (Map.Entry) iter.next();
0579: String name = (String) me.getKey();
0580: Object value = me.getValue();
0581: if (value instanceof Lease) {
0582: Lease l = (Lease) value;
0583: UID uid = l.getUID();
0584: long ttd = l.getTTD();
0585: leaseSuccess(name, uid, baseTime, ttd);
0586: } else if (value instanceof LeaseNotKnown) {
0587: LeaseNotKnown lnk = (LeaseNotKnown) value;
0588: UID uid = lnk.getUID();
0589: leaseNotKnown(name, uid);
0590: } else if (value instanceof LeaseDenied) {
0591: LeaseDenied ld = (LeaseDenied) value;
0592: UID uid = ld.getUID();
0593: Object reason = ld.getReason();
0594: leaseDenied(name, uid, reason);
0595: } else {
0596: if (logger.isErrorEnabled()) {
0597: logger.error("Unexpected modify answer: (baseTime="
0598: + Timestamp.toString(baseTime)
0599: + ", name="
0600: + name
0601: + ", value="
0602: + (value == null ? "" : "("
0603: + value.getClass().getName() + ")")
0604: + value);
0605: }
0606: }
0607: }
0608: }
0609:
0610: private boolean matchesLease(String name, UID uid,
0611: ActiveLease lease, String info) {
0612:
0613: if (lease != null && uid.equals(lease.record.getUID())) {
0614: return true;
0615: }
0616:
0617: // ignore the response.
0618: //
0619: // The ModifyService usually protects us against this, but
0620: // sometimes races can occur.
0621: //
0622: // if the lease is null:
0623: // either we never bound this entry (e.g. restart) or we've
0624: // recently unbound the entry and a stale ack/renewal has
0625: // raced back.
0626: // or
0627: // we sent a bind followed by a replacement bind, and we're
0628: // waiting for the ack on that second bind. This is similar
0629: // to the above "lease == null" race.
0630: if (logger.isDebugEnabled()) {
0631: logger.debug("Ignoring a lease answer that we didn't send?"
0632: + " info=" + info + " name=" + name + " uid=" + uid
0633: + " lease=" + lease);
0634: }
0635: return false;
0636: }
0637:
0638: private void leaseSuccess(String name, UID uid, long baseTime,
0639: long ttd) {
0640: List responses;
0641: synchronized (leases) {
0642: ActiveLease lease = (ActiveLease) leases.get(name);
0643: if (!matchesLease(name, uid, lease, "success")) {
0644: return;
0645: }
0646: responses = leaseSuccess(lease, baseTime, ttd);
0647: if (responses == null || responses.isEmpty()) {
0648: return;
0649: }
0650: }
0651: Object result = new Long(baseTime + ttd);
0652: for (int i = 0, n = responses.size(); i < n; i++) {
0653: Response res = (Response) responses.get(i);
0654: res.setResult(result);
0655: }
0656: }
0657:
0658: private List leaseSuccess(ActiveLease lease, long baseTime, long ttd) {
0659:
0660: long ttl = baseTime + ttd;
0661:
0662: boolean renewal = (0 < lease.expireTime);
0663:
0664: // good, we've created a new lease or
0665: // renewed an existing lease
0666: long now = System.currentTimeMillis();
0667: renewed(lease, now, ttl);
0668:
0669: if (renewal) {
0670: if (logger.isDebugEnabled()) {
0671: logger.debug("Renewed lease=" + lease);
0672: }
0673: } else {
0674: if (logger.isInfoEnabled()) {
0675: logger.info("Established lease: " + lease);
0676: }
0677: }
0678:
0679: return lease.takeResponses();
0680: }
0681:
0682: private void leaseNotKnown(String name, UID uid) {
0683: synchronized (leases) {
0684: ActiveLease lease = (ActiveLease) leases.get(name);
0685: if (!matchesLease(name, uid, lease, "not-known")) {
0686: return;
0687: }
0688: leaseNotKnown(lease, name, uid);
0689: }
0690: // we don't tell anyone, since we resend the uid-based
0691: // renewal with a full record-based renewal
0692: }
0693:
0694: private void leaseNotKnown(ActiveLease lease, String name, UID uid) {
0695:
0696: // send again, but this time send the full record instead
0697: // of just the UID
0698: String agent = lease.agent;
0699: Record record = lease.record;
0700:
0701: // FIXME what if record.ttd < 0? shouldn't happen...
0702:
0703: // FIXME tag lease?
0704:
0705: if (logger.isDebugEnabled()) {
0706: logger.debug("Resending lease-not-known uid=" + uid
0707: + " for our active lease " + lease);
0708: }
0709:
0710: Object o = record;
0711: if (agent != null) {
0712: o = new NameTag(agent, o);
0713: }
0714: Map m = Collections.singletonMap(name, o);
0715: modifyService.modify(m);
0716: }
0717:
0718: private void leaseDenied(String name, UID uid, Object reason) {
0719: List responses;
0720: synchronized (leases) {
0721: ActiveLease lease = (ActiveLease) leases.get(name);
0722: if (!matchesLease(name, uid, lease, "denied")) {
0723: return;
0724: }
0725: responses = leaseDenied(lease, name, uid, reason);
0726: if (responses == null || responses.isEmpty()) {
0727: return;
0728: }
0729: }
0730: for (int i = 0, n = responses.size(); i < n; i++) {
0731: Response res = (Response) responses.get(i);
0732: res.getRequest();
0733: res.setResult(reason);
0734: }
0735: }
0736:
0737: private List leaseDenied(ActiveLease lease, String name, UID uid,
0738: Object reason) {
0739:
0740: // we've lost all our lease entries
0741: //
0742: // see the Record javadocs for future "bind-only" enhancements
0743:
0744: leases.remove(name);
0745:
0746: if (logger.isWarnEnabled()) {
0747: logger.warn("Lost lease "
0748: + ((0 < lease.expireTime) ? "renewal" : "creation")
0749: + " for " + "(name=" + name + " uid=" + uid
0750: + " reason=" + reason + "), dead lease is: "
0751: + lease);
0752: }
0753:
0754: // FIXME tell agent suicide watcher?
0755:
0756: // we'll fail whatever's pending
0757: return lease.takeResponses();
0758: }
0759:
0760: /**
0761: * Check our leases and renew them if they'll expire soon.
0762: */
0763: private void renewLeases() {
0764: long now;
0765: Map m = null;
0766: synchronized (leases) {
0767: now = System.currentTimeMillis();
0768: for (Iterator iter = leases.entrySet().iterator(); iter
0769: .hasNext();) {
0770: Map.Entry me = (Map.Entry) iter.next();
0771: String name = (String) me.getKey();
0772: ActiveLease lease = (ActiveLease) me.getValue();
0773: boolean renewNow = shouldRenew(name, lease, now);
0774: if (!renewNow) {
0775: continue;
0776: }
0777: String agent = lease.agent;
0778: UID uid = lease.record.getUID();
0779: if (m == null) {
0780: m = new HashMap();
0781: }
0782: Object o = uid;
0783: if (agent != null) {
0784: o = new NameTag(agent, o);
0785: }
0786: m.put(name, o);
0787: }
0788: }
0789:
0790: if (m != null) {
0791: modifyService.modify(m);
0792: }
0793:
0794: // run me again later
0795: renewLeasesThread.schedule(config.checkLeasesPeriod);
0796: }
0797:
0798: //
0799: // These are logically part of lease but require
0800: // access to the config of the outter class
0801: //
0802:
0803: private boolean shouldRenew(String name, ActiveLease lease, long now) {
0804: // calculate renewal time based upon:
0805: // expiration time
0806: // round-trip time for the last renewal delay
0807: // some slack for the above round-trip time
0808: // added safety in case the server forgets us
0809: //
0810: // here's the current guess:
0811: //
0812: // figure out the latest time we could renew
0813: if (0 < lease.sendTime) {
0814: // we're still waiting for the last renewal ack
0815: if (logger.isDetailEnabled()) {
0816: logger.detail("lease (name=" + name + ", uid="
0817: + lease.record.getUID()
0818: + ") is still pending: " + lease.toString(now));
0819: }
0820: return false;
0821: }
0822: long latestRenew = lease.expireTime - lease.roundTripTime;
0823: // weight it to be a little early
0824: long renewalTime = (long) (lease.boundTime + (config.renewRatio * (latestRenew - lease.boundTime)));
0825: // adjust for timer period
0826: renewalTime -= config.checkLeasesPeriod;
0827: if (now < renewalTime) {
0828: if (logger.isDetailEnabled()) {
0829: logger.detail("lease (name=" + name + ", uid="
0830: + lease.record.getUID()
0831: + ") doesn't need to be renewed until "
0832: + Timestamp.toString(renewalTime, now) + ": "
0833: + lease.toString(now));
0834: }
0835: return false;
0836: }
0837: // renew, mark the sendtime for round-trip measurement
0838: lease.sendTime = now;
0839: if (logger.isDebugEnabled()) {
0840: logger.debug("Renewing lease (name=" + name + ", uid="
0841: + lease.record.getUID() + ") that will expire at "
0842: + Timestamp.toString(lease.expireTime) + ": "
0843: + lease.toString(now));
0844: }
0845: return true;
0846: }
0847:
0848: private void renewed(ActiveLease lease, long now, long expTime) {
0849: // set our timestamps
0850: if (0 < lease.sendTime) {
0851: long tripTime = now - lease.sendTime;
0852: // soften this by averaging
0853: long weightedTripTime;
0854: if (lease.roundTripTime == 0) {
0855: // first time
0856: weightedTripTime = tripTime;
0857: } else {
0858: weightedTripTime = (long) (config.tripWeight * tripTime + (1.0 - config.tripWeight)
0859: * lease.roundTripTime);
0860: }
0861: lease.roundTripTime = weightedTripTime;
0862: lease.sendTime = 0;
0863: } else {
0864: // we don't recall sending this renewal, but we'll
0865: // accept it anyways.
0866: }
0867: lease.boundTime = now;
0868: // this expTime may be in the past, but it was a successful
0869: // bind and our timer will renew it soon.
0870: lease.expireTime = expTime;
0871: }
0872:
0873: /** config options */
0874: private static class LeaserConfig {
0875: public final double renewRatio;
0876: public final double tripWeight;
0877: public final long minBundleTTD;
0878: public final long checkLeasesPeriod;
0879:
0880: public LeaserConfig(Object o) {
0881: Parameters p = new Parameters(o,
0882: "org.cougaar.core.wp.resolver.lease.");
0883: renewRatio = p.getDouble("renewRatio", 0.75);
0884: tripWeight = p.getDouble("tripWeight", 0.75);
0885: minBundleTTD = p.getLong("minBundleTTD", 60000);
0886: checkLeasesPeriod = p.getLong("checkLeasesPeriod", 20000);
0887: }
0888: }
0889:
0890: private static class ActiveLease {
0891:
0892: private static final List NO_RESPONSES = Collections.EMPTY_LIST;
0893:
0894: public final String agent;
0895: public final Record record;
0896:
0897: public final long bindTime;
0898: public long boundTime;
0899: public long sendTime;
0900: public long roundTripTime;
0901: public long expireTime;
0902:
0903: private List responses;
0904:
0905: public ActiveLease(String agent, Record record) {
0906: this .agent = agent;
0907: this .record = record;
0908: long now = System.currentTimeMillis();
0909: bindTime = now;
0910: boundTime = 0;
0911: sendTime = now;
0912: roundTripTime = 0;
0913: expireTime = 0;
0914: responses = NO_RESPONSES;
0915: }
0916:
0917: public String getAgent() {
0918: return agent;
0919: }
0920:
0921: public Record getRecord() {
0922: return record;
0923: }
0924:
0925: public boolean isBound() {
0926: return 0 < boundTime;
0927: }
0928:
0929: public long getExpirationTime() {
0930: return expireTime;
0931: }
0932:
0933: public void addResponses(List l) {
0934: for (int i = 0, n = l.size(); i < n; i++) {
0935: Response res = (Response) l.get(i);
0936: addResponse(res);
0937: }
0938: }
0939:
0940: public void addResponse(Response res) {
0941: if (!(res instanceof Response.Bind)) {
0942: throw new IllegalArgumentException("Non-bind res: "
0943: + res);
0944: }
0945: if (isBound()) {
0946: throw new IllegalStateException(
0947: "Lease is already bound, not expecting responses: "
0948: + this );
0949: }
0950: if (responses == NO_RESPONSES) {
0951: responses = new ArrayList(3);
0952: }
0953: responses.add(res);
0954: }
0955:
0956: public List takeResponses(String type) {
0957: if (type == null) {
0958: throw new IllegalArgumentException("null type");
0959: }
0960: List ret = NO_RESPONSES;
0961: for (int i = 0, n = responses.size(); i < n; i++) {
0962: Response res = (Response) responses.get(i);
0963: Request req = res.getRequest();
0964: AddressEntry ae;
0965: if (req instanceof Request.Bind) {
0966: ae = ((Request.Bind) req).getAddressEntry();
0967: } else if (req instanceof Request.Unbind) {
0968: ae = ((Request.Unbind) req).getAddressEntry();
0969: } else {
0970: // invalid?
0971: continue;
0972: }
0973: if (!type.equals(ae.getType())) {
0974: continue;
0975: }
0976: if (ret.isEmpty()) {
0977: ret = new ArrayList();
0978: ret.add(res);
0979: }
0980: responses.remove(i);
0981: --i;
0982: --n;
0983: if (n <= 0) {
0984: responses = NO_RESPONSES;
0985: }
0986: }
0987: return ret;
0988: }
0989:
0990: public List takeResponses() {
0991: List l = responses;
0992: responses = NO_RESPONSES;
0993: return l;
0994: }
0995:
0996: public String toString() {
0997: long now = System.currentTimeMillis();
0998: return toString(now);
0999: }
1000:
1001: public String toString(long now) {
1002: return "(lease" + " agent=" + agent + " record="
1003: + record.toString(bindTime, now) + " bindTime="
1004: + Timestamp.toString(bindTime, now) + " boundTime="
1005: + Timestamp.toString(boundTime, now) + " sendTime="
1006: + Timestamp.toString(sendTime, now)
1007: + " roundTripTime=" + roundTripTime
1008: + " expireTime="
1009: + Timestamp.toString(expireTime, now) + " pending["
1010: + responses.size() + "]=" + responses + ")";
1011: }
1012: }
1013:
1014: private class LeaseSP implements ServiceProvider {
1015: private final LeaseService ls = new LeaseService() {
1016: public void submit(Response res, String agent) {
1017: LeaseManager.this .submit(res, agent);
1018: }
1019: };
1020:
1021: public Object getService(ServiceBroker sb, Object requestor,
1022: Class serviceClass) {
1023: if (!LeaseService.class.isAssignableFrom(serviceClass)) {
1024: return null;
1025: }
1026: return ls;
1027: }
1028:
1029: public void releaseService(ServiceBroker sb, Object requestor,
1030: Class serviceClass, Object service) {
1031: }
1032: }
1033:
1034: private class BundleSP implements ServiceProvider {
1035: public Object getService(ServiceBroker sb, Object requestor,
1036: Class serviceClass) {
1037: if (!BundleService.class.isAssignableFrom(serviceClass)) {
1038: return null;
1039: }
1040: BundleService.Client client = (requestor instanceof BundleService.Client ? (BundleService.Client) requestor
1041: : null);
1042: BundleServiceImpl rsi = new BundleServiceImpl(client);
1043: if (client != null) {
1044: LeaseManager.this .register(client);
1045: }
1046: return rsi;
1047: }
1048:
1049: public void releaseService(ServiceBroker sb, Object requestor,
1050: Class serviceClass, Object service) {
1051: if (!(service instanceof BundleServiceImpl)) {
1052: return;
1053: }
1054: BundleServiceImpl rsi = (BundleServiceImpl) service;
1055: BundleService.Client client = rsi.client;
1056: if (client != null) {
1057: LeaseManager.this .unregister(client);
1058: }
1059: }
1060:
1061: private class BundleServiceImpl implements BundleService {
1062: protected final Client client;
1063:
1064: public BundleServiceImpl(Client client) {
1065: this .client = client;
1066: }
1067:
1068: public Bundle getBundle(String name) {
1069: return LeaseManager.this .getBundle(name);
1070: }
1071:
1072: public Map getAllBundles() {
1073: return LeaseManager.this.getAllBundles();
1074: }
1075: }
1076: }
1077: }
|