0001: /*
0002: * <copyright>
0003: *
0004: * Copyright 2002-2004 BBNT Solutions, LLC
0005: * under sponsorship of the Defense Advanced Research Projects
0006: * Agency (DARPA).
0007: *
0008: * You can redistribute this software and/or modify it under the
0009: * terms of the Cougaar Open Source License as published on the
0010: * Cougaar Open Source Website (www.cougaar.org).
0011: *
0012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023: *
0024: * </copyright>
0025: */
0026:
0027: package org.cougaar.core.wp.resolver;
0028:
0029: import java.util.Collections;
0030: import java.util.HashMap;
0031: import java.util.HashSet;
0032: import java.util.Iterator;
0033: import java.util.List;
0034: import java.util.Map;
0035: import java.util.Set;
0036: import org.cougaar.core.component.Service;
0037: import org.cougaar.core.component.ServiceBroker;
0038: import org.cougaar.core.component.ServiceProvider;
0039: import org.cougaar.core.component.ServiceRevokedListener;
0040: import org.cougaar.core.mts.Message;
0041: import org.cougaar.core.mts.MessageAddress;
0042: import org.cougaar.core.service.LoggingService;
0043: import org.cougaar.core.service.ThreadService;
0044: import org.cougaar.core.service.wp.WhitePagesProtectionService;
0045: import org.cougaar.core.thread.Schedulable;
0046: import org.cougaar.core.wp.MessageTimeoutUtils;
0047: import org.cougaar.core.wp.Parameters;
0048: import org.cougaar.core.wp.Timestamp;
0049: import org.cougaar.util.RarelyModifiedList;
0050:
0051: /**
0052: * This component sends and receives messages for the resolver.
0053: * <p>
0054: * This is the last outgoing stop for the resolver -- the request
0055: * wasn't in the cache and can't be batched with other already-pending
0056: * requests.
0057: * <p>
0058: * All of these properties are also component parameters by
0059: * removing the "org.cougaar.core.wp.resolver.transport."
0060: * prefix:
0061: * <pre>
0062: * @property org.cougaar.core.wp.resolver.transport.nagleMillis
0063: * Delay in milliseconds before sending messages, to improve
0064: * batching. Defaults to zero.
0065: * @property org.cougaar.core.wp.resolver.transport.noListNagle
0066: * Ignore the "nagleMillis" delay if the request is a new
0067: * name list (e.g. "list ."), which is often a user request.
0068: * Defaults to false.
0069: * @property org.cougaar.core.wp.resolver.transport.graceMillis
0070: * Extended message timeout deadline after startup. Defaults to
0071: * zero.
0072: * @property org.cougaar.core.wp.resolver.transport.checkDeadlinesPeriod
0073: * Time in milliseconds between checks for message timeouts if
0074: * there are any outstanding messages. Defaults to 10000.
0075: * </pre>
0076: */
0077: public class ClientTransport extends TransportBase {
0078:
0079: // this is a dummy address for messages that can't be
0080: // sent yet, e.g. because there are no WP servers.
0081: private static final MessageAddress NULL_ADDR = MessageTimeoutUtils
0082: .setTimeout(MessageAddress.getMessageAddress("wp-null"),
0083: 15000);
0084:
0085: private ClientTransportConfig config;
0086:
0087: private WhitePagesProtectionService protectS;
0088:
0089: private SelectService selectService;
0090:
0091: private PingSP pingSP;
0092: private LookupSP lookupSP;
0093: private ModifySP modifySP;
0094:
0095: private RarelyModifiedList pingClients = new RarelyModifiedList();
0096: private RarelyModifiedList lookupClients = new RarelyModifiedList();
0097: private RarelyModifiedList modifyClients = new RarelyModifiedList();
0098:
0099: private final SelectService.Client myClient = new SelectService.Client() {
0100: public void onChange() {
0101: ClientTransport.this .onServerChange();
0102: }
0103: };
0104:
0105: //
0106: // output (send to WP server):
0107: //
0108:
0109: private final Object myLock = new Object();
0110:
0111: // this is our startup grace-time on message timeouts, which is
0112: // based upon the time we obtained our messageSwitchService plus
0113: // the configuration's "graceMillis".
0114: //
0115: // this is used to allow more delivery time when the system is
0116: // starting, since unusual costs usually occur (e.g. cryto
0117: // handshaking).
0118: private long graceTime;
0119:
0120: // lookup requests (name => Entry) that are either being delayed
0121: // (nagle) or have been sent but not ack'ed (outstanding).
0122: //
0123: // Map<String, Entry>
0124: private final Map lookups = new HashMap();
0125:
0126: // modify requests (name => Entry) that are either being delayed
0127: // (nagle) or have been sent but not ack'ed (outstanding).
0128: //
0129: // Map<String, Entry>
0130: private final Map mods = new HashMap();
0131:
0132: // the most recent modify for this node, separately locked
0133: // to avoid a ping/select deadlock.
0134: private final Object nodeModifyLock = new Object();
0135: private Map nodeModify;
0136:
0137: // temporary fields for use in "send" and related methods.
0138: // accessed within myLock.
0139: private long now;
0140: private boolean sendNow;
0141: private boolean sendLater;
0142: private final Set lookupNames = new HashSet();
0143: private final Set modifyNames = new HashSet();
0144: private final Map lookupAddrs = new HashMap();
0145: private final Map modifyAddrs = new HashMap();
0146:
0147: // "nagle" delayed release
0148: private long releaseTime;
0149: private Schedulable releaseThread;
0150:
0151: // periodic check for late message acks
0152: private long checkDeadlinesTime;
0153: private Schedulable checkDeadlinesThread;
0154:
0155: //
0156: // statistics
0157: //
0158:
0159: private final Stats lookupStats = new Stats();
0160: private final Stats modifyStats = new Stats();
0161:
0162: public void setParameter(Object o) {
0163: configure(o);
0164: }
0165:
0166: private void configure(Object o) {
0167: if (config != null) {
0168: return;
0169: }
0170: config = new ClientTransportConfig(o);
0171: }
0172:
0173: public void load() {
0174: super .load();
0175:
0176: configure(null);
0177:
0178: if (logger.isDebugEnabled()) {
0179: logger.debug("Loading resolver remote handler");
0180: }
0181:
0182: protectS = (WhitePagesProtectionService) sb.getService(this ,
0183: WhitePagesProtectionService.class, null);
0184: if (logger.isDebugEnabled()) {
0185: logger.debug("White pages protection service: " + protectS);
0186: }
0187:
0188: // create threads
0189: if (config.nagleMillis > 0) {
0190: Runnable releaseRunner = new Runnable() {
0191: public void run() {
0192: // assert (thread == releaseThread);
0193: releaseNow();
0194: }
0195: };
0196: releaseThread = threadService.getThread(this ,
0197: releaseRunner,
0198: "White pages client \"nagle\" delayed sendler");
0199: }
0200:
0201: Runnable checkDeadlinesRunner = new Runnable() {
0202: public void run() {
0203: // assert (thread == checkDeadlinesThread);
0204: checkDeadlinesNow();
0205: }
0206: };
0207: checkDeadlinesThread = threadService.getThread(this ,
0208: checkDeadlinesRunner,
0209: "White pages client transport send queue checker");
0210:
0211: // register to select servers
0212: selectService = (SelectService) sb.getService(myClient,
0213: SelectService.class, null);
0214: if (selectService == null) {
0215: throw new RuntimeException("Unable to obtain SelectService");
0216: }
0217:
0218: // advertise our service
0219: pingSP = new PingSP();
0220: sb.addService(PingService.class, pingSP);
0221: lookupSP = new LookupSP();
0222: sb.addService(LookupService.class, lookupSP);
0223: modifySP = new ModifySP();
0224: sb.addService(ModifyService.class, modifySP);
0225: }
0226:
0227: public void unload() {
0228: if (modifySP != null) {
0229: sb.revokeService(ModifyService.class, modifySP);
0230: modifySP = null;
0231: }
0232: if (lookupSP != null) {
0233: sb.revokeService(LookupService.class, lookupSP);
0234: lookupSP = null;
0235: }
0236: if (pingSP != null) {
0237: sb.revokeService(PingService.class, pingSP);
0238: pingSP = null;
0239: }
0240:
0241: if (selectService != null) {
0242: sb.releaseService(myClient, SelectService.class,
0243: selectService);
0244: selectService = null;
0245: }
0246:
0247: if (protectS != null) {
0248: sb.releaseService(this , WhitePagesProtectionService.class,
0249: protectS);
0250: protectS = null;
0251: }
0252:
0253: super .unload();
0254: }
0255:
0256: protected void foundMessageTransport() {
0257: // super.foundMessageTransport();
0258: synchronized (myLock) {
0259: long now = System.currentTimeMillis();
0260: if (config.graceMillis >= 0) {
0261: this .graceTime = now + config.graceMillis;
0262: }
0263: // schedule a "send"
0264: checkDeadlinesTime = now;
0265: checkDeadlinesThread.start();
0266: }
0267: }
0268:
0269: private List getList(int action) {
0270: return (action == WPQuery.PING ? pingClients
0271: : action == WPQuery.LOOKUP ? lookupClients
0272: : action == WPQuery.MODIFY ? modifyClients
0273: : null);
0274: }
0275:
0276: private void register(int action, Object c) {
0277: getList(action).add(c);
0278: }
0279:
0280: private void unregister(int action, Object c) {
0281: getList(action).remove(c);
0282: }
0283:
0284: private void onServerChange() {
0285: // the list of servers has changed
0286: //
0287: // kick the thread, since either we've added a new server
0288: // (important if we had zero servers) or we've removed a
0289: // server (must revisit any messages we sent to that server).
0290: synchronized (myLock) {
0291: checkDeadlinesTime = System.currentTimeMillis();
0292: checkDeadlinesThread.start();
0293: }
0294: }
0295:
0296: //
0297: // send:
0298: //
0299:
0300: private void ping(MessageAddress addr, long deadline) {
0301: long now = System.currentTimeMillis();
0302: if (now > deadline) {
0303: // to late?
0304: return;
0305: }
0306: MessageAddress target = MessageTimeoutUtils.setDeadline(addr,
0307: deadline);
0308: // must send our node's "modify" record first, otherwise
0309: // the target can't reply!
0310: Map modObj;
0311: synchronized (nodeModifyLock) {
0312: modObj = nodeModify;
0313: }
0314: if (modObj == null) {
0315: if (logger.isWarnEnabled()) {
0316: logger.warn("Sending ping(" + addr
0317: + ") without first sending reply-to data for "
0318: + agentId);
0319: }
0320: } else {
0321: WPQuery modq = new WPQuery(agentId, target, now,
0322: WPQuery.MODIFY, modObj);
0323: sendOrQueue(modq);
0324: }
0325: WPQuery wpq = new WPQuery(agentId, target, now, WPQuery.PING,
0326: null);
0327: sendOrQueue(wpq);
0328: }
0329:
0330: private void lookup(Map m) {
0331: send(true, m);
0332: }
0333:
0334: private void modify(Map m) {
0335: send(false, m);
0336: }
0337:
0338: private void releaseNow() {
0339: // call "send" with null, which will examine the releaseTime
0340: send(true, null);
0341: }
0342:
0343: private void checkDeadlinesNow() {
0344: // call "send" with null, which will examine the checkDeadlinesTime
0345: send(false, null);
0346: }
0347:
0348: private void send(boolean lookup, Map m) {
0349: stats(lookup).send(m);
0350:
0351: // The various callers are:
0352: // - our clients (cache, leases)
0353: // - our own releaseThread (adds batching delay)
0354: // - our own check checkDeadlinesThread (check for timeouts
0355: // or new servers)
0356: // These last two clients pass a null map.
0357:
0358: // stuff we will send: (target => map(name => sendObj))
0359: Map lookupsToSend;
0360: Map modifiesToSend;
0361:
0362: // save modify-node record for ping use
0363: updateNodeModify(lookup, m);
0364:
0365: synchronized (myLock) {
0366: try {
0367: // initialize temporary variables
0368: init();
0369:
0370: // create entries for the new queries
0371: checkSendMap(lookup, m);
0372:
0373: if (!canSendMessages()) {
0374: // no MTS yet? We'll kick a thread when the MTS shows up
0375: return;
0376: }
0377:
0378: // check for delayed release entries, even if we're not the
0379: // releaseThread
0380: checkReleaseTimer();
0381:
0382: // check for message timeouts, even if we're not the
0383: // checkDeadlinesThread
0384: checkDeadlineTimer();
0385:
0386: if (!shouldReleaseNow()) {
0387: // our releaseThread will wake us later, allowing us to
0388: // batch these requests.
0389: return;
0390: }
0391:
0392: if (!collectMessagesToSend()) {
0393: // nothing to send. Another possibility is that there are
0394: // no WP servers yet, in which case we'll kick a thread when
0395: // they show up.
0396: return;
0397: }
0398:
0399: // we're sending something now, so make sure we'll wake
0400: // up later to check timeouts
0401: ensureDeadlineTimer();
0402:
0403: // take maps stuff we will send
0404: lookupsToSend = takeMessagesToSend(true);
0405: modifiesToSend = takeMessagesToSend(false);
0406: } finally {
0407: cleanup();
0408: }
0409: }
0410:
0411: // send messages
0412: sendAll(lookupsToSend, modifiesToSend);
0413: }
0414:
0415: private void init() {
0416: now = System.currentTimeMillis();
0417:
0418: sendNow = (config.nagleMillis <= 0 || (releaseTime > 0 && releaseTime <= now));
0419:
0420: sendLater = false;
0421:
0422: // these should already be cleared by "cleanup()":
0423: lookupNames.clear();
0424: modifyNames.clear();
0425: lookupAddrs.clear();
0426: modifyAddrs.clear();
0427: }
0428:
0429: private void updateNodeModify(boolean lookup, Map m) {
0430: synchronized (nodeModifyLock) {
0431: Map newM = Util.updateNodeModify(lookup, m, agentId,
0432: nodeModify);
0433: if (newM != nodeModify) {
0434: if (logger.isDetailEnabled()) {
0435: logger.detail("updated node " + agentId
0436: + " modify from " + nodeModify + " to "
0437: + newM);
0438: }
0439: nodeModify = newM;
0440: }
0441: }
0442: }
0443:
0444: private void checkSendMap(boolean lookup, Map m) {
0445: int n = (m == null ? 0 : m.size());
0446: if (n == 0) {
0447: return;
0448: }
0449:
0450: // check to see if this map contains a forced sendNow
0451: if (config.noListNagle && !sendNow) {
0452: Iterator iter = m.entrySet().iterator();
0453: for (int i = 0; i < n; i++) {
0454: Map.Entry me = (Map.Entry) iter.next();
0455: String name = (String) me.getKey();
0456: Object query = me.getValue();
0457: if (Util.mustSendNow(lookup, name, query)) {
0458: if (logger.isDetailEnabled()) {
0459: logger.detail("mustSendNow(" + lookup + ", "
0460: + name + ", " + query + ")");
0461: }
0462: sendNow = true;
0463: break;
0464: }
0465: }
0466: }
0467:
0468: Map table = (lookup ? lookups : mods);
0469: Set names = (lookup ? lookupNames : modifyNames);
0470: Iterator iter = m.entrySet().iterator();
0471: for (int i = 0; i < n; i++) {
0472: Map.Entry me = (Map.Entry) iter.next();
0473: String name = (String) me.getKey();
0474: Object query = me.getValue();
0475: Entry e = (Entry) table.get(name);
0476: // add to queue
0477: if (e != null
0478: && !shouldSend(lookup, name, query, e.getQuery())) {
0479: continue;
0480: }
0481: // add or replace the entry
0482: e = new Entry(query, now);
0483: table.put(name, e);
0484: if (sendNow) {
0485: names.add(name);
0486: continue;
0487: }
0488: sendLater = true;
0489: if (logger.isDetailEnabled()) {
0490: logger.detail("delaying initial release of "
0491: + (lookup ? "lookup" : "modify") + " " + name
0492: + "=" + query);
0493: }
0494: stats(lookup).later();
0495: }
0496: }
0497:
0498: private boolean shouldSend(boolean lookup, String name,
0499: Object query, Object sentObj) {
0500: try {
0501: if (Util.shouldSend(lookup, name, query, sentObj)) {
0502: return true;
0503: }
0504: } catch (Exception err) {
0505: if (logger.isErrorEnabled()) {
0506: logger.error("shouldSend failed", err);
0507: }
0508: }
0509: if (logger.isDebugEnabled()) {
0510: logger.debug("Not sending "
0511: + (lookup ? "lookup" : "modify") + " (name=" + name
0512: + " query=" + query
0513: + "), since we've already sent " + sentObj);
0514: }
0515: return false;
0516: }
0517:
0518: private boolean canSendMessages() {
0519: if (hasMessageTransport()) {
0520: return true;
0521: }
0522: if (logger.isDetailEnabled()) {
0523: logger.detail("waiting for message transport");
0524: }
0525: return false;
0526: }
0527:
0528: private void checkReleaseTimer() {
0529: if (!sendNow || config.nagleMillis <= 0) {
0530: return;
0531: }
0532: if (releaseTime > 0) {
0533: // timer is due
0534: releaseTime = 0;
0535: // cancel the timer. This is a no-op if it's our thread.
0536: releaseThread.cancelTimer();
0537: }
0538: // find due entries (optimize me?)
0539: for (int t = 0; t < 2; t++) {
0540: boolean tlookup = (t == 0);
0541: Map table = (tlookup ? lookups : mods);
0542: int tsize = table.size();
0543: if (tsize <= 0) {
0544: continue;
0545: }
0546: Set names = (tlookup ? lookupNames : modifyNames);
0547: Iterator iter = table.entrySet().iterator();
0548: for (int i = 0; i < tsize; i++) {
0549: Map.Entry me = (Map.Entry) iter.next();
0550: String name = (String) me.getKey();
0551: Entry e = (Entry) me.getValue();
0552: if (e.getTarget() != null) {
0553: continue;
0554: }
0555: names.add(name);
0556: }
0557: }
0558: }
0559:
0560: private void checkDeadlineTimer() {
0561: if (checkDeadlinesTime <= 0 || checkDeadlinesTime > now) {
0562: return;
0563: }
0564: // timer is due
0565: checkDeadlinesTime = 0;
0566: // now's a good time to dump debugging info
0567: debugQueues();
0568: boolean anyStillPending = false;
0569: // find due entries (optimize me?)
0570: for (int t = 0; t < 2; t++) {
0571: boolean tlookup = (t == 0);
0572: Map table = (tlookup ? lookups : mods);
0573: int tsize = table.size();
0574: if (tsize <= 0) {
0575: continue;
0576: }
0577: Set names = (tlookup ? lookupNames : modifyNames);
0578: Iterator iter = table.entrySet().iterator();
0579: for (int i = 0; i < tsize; i++) {
0580: Map.Entry me = (Map.Entry) iter.next();
0581: String name = (String) me.getKey();
0582: Entry e = (Entry) me.getValue();
0583: MessageAddress target = e.getTarget();
0584: if (target == null && sendNow && config.nagleMillis > 0) {
0585: // waiting for releaseThread
0586: continue;
0587: }
0588: if (target != null && target != NULL_ADDR) {
0589: if (selectService.contains(target)) {
0590: long deadline = e.getDeadline();
0591: if (deadline <= 0 || deadline > now) {
0592: // give it more time for the ack
0593: anyStillPending = true;
0594: continue;
0595: }
0596: if (shortcutNodeModify(tlookup, name, e, now)) {
0597: // unusual case: local-node uid-based modify
0598: continue;
0599: }
0600: }
0601: // update server stats
0602: selectService.update(target,
0603: (now - e.getSendTime()), true);
0604: }
0605: if (!sendNow && logger.isDetailEnabled()) {
0606: logger.detail("delaying retry release of "
0607: + (tlookup ? "lookup" : "modify") + " "
0608: + name + "=" + e.getQuery() + ", entry="
0609: + e.toString(now));
0610: }
0611: stats(tlookup).retry();
0612: e.setTarget(null);
0613: if (sendNow) {
0614: names.add(name);
0615: continue;
0616: }
0617: sendLater = true;
0618: stats(tlookup).later();
0619: }
0620: }
0621: if (anyStillPending) {
0622: // schedule our next deadline check
0623: ensureDeadlineTimer();
0624: }
0625: }
0626:
0627: /**
0628: * Special test for local-node uid-based modify requests.
0629: */
0630: private boolean shortcutNodeModify(boolean lookup, String name,
0631: Entry e, long now) {
0632: // replace with modify(ourNodeModify)
0633: Object query = e.getQuery();
0634: Object answer = Util.shortcutNodeModify(lookup, agentId, name,
0635: query);
0636: if (answer == null) {
0637: return false;
0638: }
0639: Map m = Collections.singletonMap(name, answer);
0640: WPAnswer wpa = new WPAnswer(e.getTarget(), // from the server
0641: agentId, // back to us
0642: e.getSendTime(), // our sendTime
0643: now, // the "server" sendTime
0644: true, // use the above time
0645: WPAnswer.MODIFY, // modify
0646: m); // the lease-not-known answer
0647: if (logger.isInfoEnabled()) {
0648: logger.info("Timeout waiting for uid-based modify response"
0649: + " (name=" + name + " query=" + query
0650: + "), pretending that the server"
0651: + " sent back a lease-not-known response: " + wpa);
0652: }
0653: receive(wpa);
0654: return true;
0655: }
0656:
0657: private boolean shouldReleaseNow() {
0658: if (sendNow || !sendLater) {
0659: return true;
0660: }
0661: // make sure timer is running to send later
0662: if (releaseTime == 0) {
0663: // start timer
0664: releaseTime = now + config.nagleMillis;
0665: if (logger.isDetailEnabled()) {
0666: logger.detail("starting delayed release timer");
0667: }
0668: releaseThread.schedule(config.nagleMillis);
0669: }
0670: // wait for timer
0671: if (logger.isDetailEnabled()) {
0672: logger.detail("waiting " + (releaseTime - now)
0673: + " for release timer");
0674: }
0675: return false;
0676: }
0677:
0678: private boolean collectMessagesToSend() {
0679: boolean anyToSend = false;
0680: for (int x = 0; x < 2; x++) {
0681: boolean xlookup = (x == 0);
0682: Set names = (xlookup ? lookupNames : modifyNames);
0683: if (names.isEmpty()) {
0684: continue;
0685: }
0686: Iterator iter = names.iterator();
0687: for (int i = 0, nsize = names.size(); i < nsize; i++) {
0688: String name = (String) iter.next();
0689: Map table = (xlookup ? lookups : mods);
0690: Entry e = (Entry) table.get(name);
0691: // accessing the "selectService" within our lock may be an
0692: // issue someday, but for now we'll assume it's allowed
0693: MessageAddress target = selectService.select(xlookup,
0694: name);
0695: if (target == null) {
0696: // no target? mark entry
0697: e.setTarget(NULL_ADDR);
0698: if (logger.isDetailEnabled()) {
0699: logger
0700: .detail("queuing message until WP servers are available: "
0701: + (xlookup ? "lookup"
0702: : "modify")
0703: + " "
0704: + name + "=" + e.toString(now));
0705: }
0706: continue;
0707: }
0708: e.setTarget(target);
0709:
0710: // wrap query for security
0711: Object query = e.getQuery();
0712: Object sendObj = query;
0713: if (query != null) {
0714: sendObj = wrapQuery(xlookup, name, query);
0715: if (sendObj == null) {
0716: // wrapping rejected this query
0717: table.remove(name);
0718: continue;
0719: }
0720: }
0721:
0722: anyToSend = true;
0723:
0724: // set timestamps
0725: e.setSendTime(now);
0726: long deadline = MessageTimeoutUtils.getDeadline(target);
0727: if (deadline > 0 && graceTime > 0
0728: && graceTime > deadline) {
0729: // extend deadline to match initial "grace" period
0730: deadline = graceTime;
0731: }
0732: e.setDeadline(deadline);
0733:
0734: // add to (target => map(name => sendObj))
0735: Map xaddrs = (xlookup ? lookupAddrs : modifyAddrs);
0736: if (nsize == 1) {
0737: // minor optimization for single-element map
0738: xaddrs.put(target, Collections.singletonMap(name,
0739: sendObj));
0740: break;
0741: }
0742: Map addrMap = (Map) xaddrs.get(target);
0743: if (addrMap == null) {
0744: addrMap = new HashMap();
0745: xaddrs.put(target, addrMap);
0746: }
0747: // assert (!addrMap.containsKey(name));
0748: addrMap.put(name, sendObj);
0749: }
0750: }
0751:
0752: return anyToSend;
0753: }
0754:
0755: private void ensureDeadlineTimer() {
0756: if (checkDeadlinesTime > 0) {
0757: return;
0758: }
0759: // schedule our next deadline check
0760: checkDeadlinesTime = now + config.checkDeadlinesPeriod;
0761: if (logger.isDetailEnabled()) {
0762: logger
0763: .detail("will send messages, scheduling timer to check deadlines");
0764: }
0765: checkDeadlinesThread.schedule(config.checkDeadlinesPeriod);
0766: }
0767:
0768: private Map takeMessagesToSend(boolean lookup) {
0769: Map addrs = (lookup ? lookupAddrs : modifyAddrs);
0770: int n = addrs.size();
0771: if (n == 0) {
0772: return null;
0773: }
0774: if (n == 1) {
0775: Iterator iter = addrs.entrySet().iterator();
0776: Map.Entry me = (Map.Entry) iter.next();
0777: return Collections.singletonMap(me.getKey(), me.getValue());
0778: }
0779: return new HashMap(addrs);
0780: }
0781:
0782: private void cleanup() {
0783: now = 0;
0784: sendNow = false;
0785: sendLater = false;
0786: lookupNames.clear();
0787: modifyNames.clear();
0788: lookupAddrs.clear();
0789: modifyAddrs.clear();
0790: }
0791:
0792: private void sendAll(Map lookupsToSend, Map modifiesToSend) {
0793: // send messages
0794: //
0795: // send the modifications first, so a lookup that matches our
0796: // own modifications will see our modifications instead of
0797: // the pre-modification state.
0798: //
0799: // we send the lookups and modifies separately, even if they're
0800: // going to the same target. We lose some of our batching, but
0801: // this simplfies the security message-content checks.
0802: long now = System.currentTimeMillis();
0803: sendAll(false, modifiesToSend, now);
0804: sendAll(true, lookupsToSend, now);
0805: }
0806:
0807: private void sendAll(boolean lookup, Map addrMap, long now) {
0808: stats(lookup).sendAll(addrMap);
0809: int n = (addrMap == null ? 0 : addrMap.size());
0810: if (n == 0) {
0811: return;
0812: }
0813: Iterator iter = addrMap.entrySet().iterator();
0814: for (int i = 0; i < n; i++) {
0815: Map.Entry me = (Map.Entry) iter.next();
0816: MessageAddress target = (MessageAddress) me.getKey();
0817: Map map = (Map) me.getValue();
0818: send(lookup, target, map, now);
0819: }
0820: }
0821:
0822: private void send(boolean lookup, MessageAddress target, Map map,
0823: long now) {
0824: if (target == NULL_ADDR) {
0825: if (logger.isDetailEnabled()) {
0826: logger
0827: .detail("queuing message until WP servers are available: "
0828: + (lookup ? "lookup" : "modify")
0829: + " "
0830: + map);
0831: }
0832: } else {
0833: WPQuery wpq = new WPQuery(agentId, target, now,
0834: (lookup ? WPQuery.LOOKUP : WPQuery.MODIFY), map);
0835: if (logger.isDetailEnabled()) {
0836: logger.detail("sending message: " + wpq);
0837: }
0838: sendOrQueue(wpq);
0839: }
0840: }
0841:
0842: private Object wrapQuery(boolean lookup, String name, Object query) {
0843: if (lookup || protectS == null) {
0844: return query;
0845: }
0846: // wrap sendObj using protection service
0847: String agent;
0848: if (query instanceof NameTag) {
0849: agent = ((NameTag) query).getName();
0850: } else {
0851: agent = agentId.getAddress();
0852: }
0853: WhitePagesProtectionService.Wrapper wrapper;
0854: try {
0855: wrapper = protectS.wrap(agent, query);
0856: if (wrapper == null) {
0857: throw new RuntimeException("Wrap returned null");
0858: }
0859: } catch (Exception e) {
0860: if (logger.isErrorEnabled()) {
0861: logger.error("Unable to wrap (agent=" + agent
0862: + " name=" + name + " query=" + query + ")", e);
0863: }
0864: wrapper = null;
0865: }
0866: Object ret = new NameTag(agent, wrapper);
0867: if (logger.isDetailEnabled()) {
0868: logger.detail("wrapped (agent=" + agent + " name=" + name
0869: + " query=" + query + ") to " + ret);
0870: }
0871: return ret;
0872: }
0873:
0874: //
0875: // receive:
0876: //
0877:
0878: protected boolean shouldReceive(Message m) {
0879: if (m instanceof WPAnswer) {
0880: WPAnswer wpa = (WPAnswer) m;
0881: int action = wpa.getAction();
0882: return (action == WPAnswer.LOOKUP
0883: || action == WPAnswer.MODIFY || action == WPAnswer.PING);
0884: }
0885: return false;
0886: }
0887:
0888: protected void receiveNow(Message msg) {
0889: if (logger.isDetailEnabled()) {
0890: logger.detail("receiving message: " + msg);
0891: }
0892:
0893: WPAnswer wpa = (WPAnswer) msg;
0894: int action = wpa.getAction();
0895:
0896: MessageAddress addr = wpa.getOriginator();
0897: long sendTime = wpa.getSendTime();
0898: long now = System.currentTimeMillis();
0899: long rtt = (now - sendTime);
0900:
0901: if (action == WPAnswer.PING) {
0902: List l = pingClients.getUnmodifiableList();
0903: for (int i = 0, ln = l.size(); i < ln; i++) {
0904: PingService.Client c = (PingService.Client) l.get(i);
0905: c.pingAnswer(addr, rtt);
0906: }
0907: return;
0908: }
0909:
0910: boolean lookup = (action == WPAnswer.LOOKUP);
0911: Map m = wpa.getMap();
0912:
0913: stats(lookup).receiveNow(m);
0914:
0915: int n = (m == null ? 0 : m.size());
0916: if (n == 0) {
0917: return;
0918: }
0919:
0920: long replyTime = wpa.getReplyTime();
0921: boolean useServerTime = wpa.useServerTime();
0922:
0923: Map answerMap = null;
0924:
0925: // remove from pending queue
0926: synchronized (myLock) {
0927: Iterator iter = m.entrySet().iterator();
0928: for (int i = 0; i < n; i++) {
0929: Map.Entry me = (Map.Entry) iter.next();
0930: String name = (String) me.getKey();
0931: Object answer = me.getValue();
0932: // tell a queue
0933: if (!shouldReceive(lookup, addr, name, answer, now)) {
0934: continue;
0935: }
0936: if (n == 1) {
0937: answerMap = m;
0938: continue;
0939: }
0940: // add to the per-name map
0941: if (answerMap == null) {
0942: answerMap = new HashMap();
0943: }
0944: answerMap.put(name, answer);
0945: }
0946:
0947: if (answerMap == null) {
0948: return;
0949: }
0950:
0951: // reward the server
0952: selectService.update(addr, rtt, false);
0953: }
0954:
0955: // compute the base time
0956: long baseTime;
0957: if (useServerTime) {
0958: // use the server's clock
0959: baseTime = replyTime;
0960: } else {
0961: // use a round-trip-time estimate as defined in WPAnswer
0962: baseTime = sendTime + (rtt >> 1);
0963: }
0964:
0965: stats(lookup).accept(answerMap);
0966:
0967: // tell our clients
0968: if (lookup) {
0969: List l = lookupClients.getUnmodifiableList();
0970: for (int i = 0, ln = l.size(); i < ln; i++) {
0971: LookupService.Client c = (LookupService.Client) l
0972: .get(i);
0973: c.lookupAnswer(baseTime, answerMap);
0974: }
0975: } else {
0976: List l = modifyClients.getUnmodifiableList();
0977: for (int i = 0, ln = l.size(); i < ln; i++) {
0978: ModifyService.Client c = (ModifyService.Client) l
0979: .get(i);
0980: c.modifyAnswer(baseTime, answerMap);
0981: }
0982: }
0983: }
0984:
0985: /**
0986: * Figure out if we should accept this request response, including
0987: * whether or not we sent it and any necessary ordering/version
0988: * tests.
0989: */
0990: private boolean shouldReceive(boolean lookup, MessageAddress addr,
0991: String name, Object answer, long now) {
0992: // assert (Thread.holdsLock(myLock));
0993:
0994: Map table = (lookup ? lookups : mods);
0995:
0996: boolean accepted;
0997: Entry e = (Entry) table.get(name);
0998: if (e == null) {
0999: // not sent?
1000: accepted = false;
1001: } else {
1002: Object sentObj = e.getQuery();
1003: accepted = Util
1004: .shouldReceive(lookup, name, answer, sentObj);
1005: if (accepted) {
1006: // clear the table entry
1007: table.remove(name);
1008: }
1009: }
1010:
1011: if (logger.isInfoEnabled()) {
1012: logger.info((accepted ? "Accepting" : "Ignoring") + " "
1013: + (lookup ? "lookup" : "modify")
1014: + " response (name=" + name + ", answer=" + answer
1015: + ") returned by " + addr + ", since it "
1016: + (accepted ? "matches" : "doesn't match")
1017: + " our sent query: "
1018: + (e == null ? "<null>" : e.toString(now)));
1019: }
1020:
1021: return accepted;
1022: }
1023:
1024: //
1025: // debug printer:
1026: //
1027:
1028: private Stats stats(boolean lookup) {
1029: return (lookup ? lookupStats : modifyStats);
1030: }
1031:
1032: private void debugQueues() {
1033: if (!logger.isDebugEnabled()) {
1034: return;
1035: }
1036:
1037: // stats
1038: logger.debug("header, agent, " + stats(true).getHeader());
1039: logger.debug("lookup, " + agentId + ", "
1040: + stats(true).getStats());
1041: logger.debug("modify, " + agentId + ", "
1042: + stats(false).getStats());
1043:
1044: String currentServers = selectService.toString();
1045: synchronized (myLock) {
1046: String s = "";
1047: s += "\n##### client transport output queue #######################";
1048: s += "\nservers=" + currentServers;
1049: long now = System.currentTimeMillis();
1050: boolean firstPass = true;
1051: while (true) {
1052: Map m = (firstPass ? lookups : mods);
1053: int n = m.size();
1054: s += "\n" + (firstPass ? "lookup" : "modify") + "[" + n
1055: + "]: ";
1056: if (n > 0) {
1057: for (Iterator iter = m.entrySet().iterator(); iter
1058: .hasNext();) {
1059: Map.Entry me = (Map.Entry) iter.next();
1060: String name = (String) me.getKey();
1061: Entry e = (Entry) me.getValue();
1062: s += "\n " + name + "\t => "
1063: + e.toString(now);
1064: }
1065: }
1066: if (firstPass) {
1067: firstPass = false;
1068: } else {
1069: break;
1070: }
1071: }
1072: s += "\n###########################################################";
1073: logger.debug(s);
1074: }
1075: }
1076:
1077: //
1078: // classes:
1079: //
1080:
1081: private static class Entry {
1082:
1083: private final Object query;
1084:
1085: private final long creationTime;
1086:
1087: private long sendTime;
1088: private long deadline;
1089: private MessageAddress target;
1090:
1091: public Entry(Object query, long now) {
1092: this .query = query;
1093: this .creationTime = now;
1094: }
1095:
1096: public Object getQuery() {
1097: return query;
1098: }
1099:
1100: public long getCreationTime() {
1101: return creationTime;
1102: }
1103:
1104: public long getSendTime() {
1105: return sendTime;
1106: }
1107:
1108: public void setSendTime(long sendTime) {
1109: this .sendTime = sendTime;
1110: }
1111:
1112: public long getDeadline() {
1113: return deadline;
1114: }
1115:
1116: public void setDeadline(long deadline) {
1117: this .deadline = deadline;
1118: }
1119:
1120: public MessageAddress getTarget() {
1121: return target;
1122: }
1123:
1124: public void setTarget(MessageAddress target) {
1125: this .target = target;
1126: }
1127:
1128: public String toString() {
1129: long now = System.currentTimeMillis();
1130: return toString(now);
1131: }
1132:
1133: public String toString(long now) {
1134: return "(created="
1135: + Timestamp.toString(getCreationTime(), now)
1136: + " sent=" + Timestamp.toString(getSendTime(), now)
1137: + " deadline="
1138: + Timestamp.toString(getDeadline(), now)
1139: + " target=" + getTarget() + " query=" + getQuery()
1140: + ")";
1141: }
1142: }
1143:
1144: private static class Stats {
1145:
1146: private final Object lock = new Object();
1147:
1148: private int count;
1149: private int size;
1150: private int later;
1151: private int sendCount;
1152: private int sendSize;
1153: private int retrySize;
1154: private int receiveCount;
1155: private int receiveSize;
1156: private int acceptCount;
1157: private int acceptSize;
1158:
1159: private String getHeader() {
1160: return "count" + ", size" + ", later" + ", sendC"
1161: + ", sendS" + ", retryS" + ", recvC" + ", recvS"
1162: + ", accC" + ", accS";
1163: }
1164:
1165: private String getStats() {
1166: synchronized (lock) {
1167: return count + ", " + size + ", " + later + ", "
1168: + sendCount + ", " + sendSize + ", "
1169: + retrySize + ", " + receiveCount + ", "
1170: + receiveSize + ", " + acceptCount + ", "
1171: + acceptSize;
1172: }
1173: }
1174:
1175: private void send(Map m) {
1176: int s = (m == null ? 0 : m.size());
1177: if (s <= 0) {
1178: return;
1179: }
1180: synchronized (lock) {
1181: count++;
1182: size += s;
1183: }
1184: }
1185:
1186: private void later() {
1187: synchronized (lock) {
1188: later++;
1189: }
1190: }
1191:
1192: private void sendAll(Map addrMap) {
1193: int n = (addrMap == null ? 0 : addrMap.size());
1194: if (n <= 0) {
1195: return;
1196: }
1197: synchronized (lock) {
1198: sendCount += n;
1199: int s = 0;
1200: Iterator iter = addrMap.entrySet().iterator();
1201: for (int i = 0; i < n; i++) {
1202: Map.Entry me = (Map.Entry) iter.next();
1203: Map m = (Map) me.getValue();
1204: s += m.size();
1205: }
1206: sendSize += s;
1207: }
1208: }
1209:
1210: private void retry() {
1211: synchronized (lock) {
1212: retrySize++;
1213: }
1214: }
1215:
1216: private void receiveNow(WPAnswer wpa) {
1217: synchronized (lock) {
1218: if (wpa == null) {
1219: return;
1220: }
1221: Map m = wpa.getMap();
1222: receiveNow(m);
1223: }
1224: }
1225:
1226: private void receiveNow(Map m) {
1227: synchronized (lock) {
1228: receiveCount++;
1229: int n = (m == null ? 0 : m.size());
1230: receiveSize += n;
1231: }
1232: }
1233:
1234: private void accept(Map answerMap) {
1235: synchronized (lock) {
1236: acceptCount++;
1237: int n = (answerMap == null ? 0 : answerMap.size());
1238: acceptSize += n;
1239: }
1240: }
1241: }
1242:
1243: private abstract class SPBase extends ServiceProviderBase {
1244: protected abstract int getAction();
1245:
1246: protected void register(Object client) {
1247: ClientTransport.this .register(getAction(), client);
1248: }
1249:
1250: protected void unregister(Object client) {
1251: ClientTransport.this .unregister(getAction(), client);
1252: }
1253: }
1254:
1255: private class PingSP extends SPBase {
1256: protected int getAction() {
1257: return WPQuery.PING;
1258: }
1259:
1260: protected Class getServiceClass() {
1261: return PingService.class;
1262: }
1263:
1264: protected Class getClientClass() {
1265: return PingService.Client.class;
1266: }
1267:
1268: protected Service getService(Object client) {
1269: return new SI(client);
1270: }
1271:
1272: protected class SI extends MyServiceImpl implements PingService {
1273: public SI(Object client) {
1274: super (client);
1275: }
1276:
1277: public void ping(MessageAddress addr, long deadline) {
1278: ClientTransport.this .ping(addr, deadline);
1279: }
1280: }
1281: }
1282:
1283: private class LookupSP extends SPBase {
1284: protected int getAction() {
1285: return WPQuery.LOOKUP;
1286: }
1287:
1288: protected Class getServiceClass() {
1289: return LookupService.class;
1290: }
1291:
1292: protected Class getClientClass() {
1293: return LookupService.Client.class;
1294: }
1295:
1296: protected Service getService(Object client) {
1297: return new SI(client);
1298: }
1299:
1300: protected class SI extends MyServiceImpl implements
1301: LookupService {
1302: public SI(Object client) {
1303: super (client);
1304: }
1305:
1306: public void lookup(Map m) {
1307: ClientTransport.this .lookup(m);
1308: }
1309: }
1310: }
1311:
1312: private class ModifySP extends SPBase {
1313: protected int getAction() {
1314: return WPQuery.MODIFY;
1315: }
1316:
1317: protected Class getServiceClass() {
1318: return ModifyService.class;
1319: }
1320:
1321: protected Class getClientClass() {
1322: return ModifyService.Client.class;
1323: }
1324:
1325: protected Service getService(Object client) {
1326: return new SI(client);
1327: }
1328:
1329: protected class SI extends MyServiceImpl implements
1330: ModifyService {
1331: public SI(Object client) {
1332: super (client);
1333: }
1334:
1335: public void modify(Map m) {
1336: ClientTransport.this .modify(m);
1337: }
1338: }
1339: }
1340:
1341: /** config options */
1342: private static class ClientTransportConfig {
1343: public final long nagleMillis;
1344: public final boolean noListNagle;
1345: public final long checkDeadlinesPeriod;
1346: public final long graceMillis;
1347:
1348: public ClientTransportConfig(Object o) {
1349: Parameters p = new Parameters(o,
1350: "org.cougaar.core.wp.resolver.transport.");
1351: nagleMillis = p.getLong("nagleMillis", 0);
1352: noListNagle = p.getBoolean("noListNagle", false);
1353: checkDeadlinesPeriod = p.getLong("checkDeadlinesPeriod",
1354: 10000);
1355: graceMillis = p.getLong("graceMillis", 0);
1356: }
1357: }
1358: }
|