001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2007 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.demo.mesh;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.HashMap;
033: import java.util.HashSet;
034: import java.util.LinkedHashSet;
035: import java.util.List;
036: import java.util.Map;
037: import java.util.Set;
038: import org.cougaar.bootstrap.SystemProperties;
039: import org.cougaar.core.agent.service.alarm.AlarmBase;
040: import org.cougaar.core.blackboard.IncrementalSubscription;
041: import org.cougaar.core.blackboard.TodoSubscription;
042: import org.cougaar.core.mts.MessageAddress;
043: import org.cougaar.core.plugin.ComponentPlugin;
044: import org.cougaar.core.relay.SimpleRelay;
045: import org.cougaar.core.relay.SimpleRelaySource;
046: import org.cougaar.core.service.BlackboardService;
047: import org.cougaar.core.service.LoggingService;
048: import org.cougaar.core.service.UIDService;
049: import org.cougaar.util.Arguments;
050: import org.cougaar.util.UnaryPredicate;
051:
052: /**
053: * This plugin is a relay scalability test that creates an arbitrarily large
054: * "mesh" of relays.
055: * <p>
056: * For example, this plugin can be configured to create a fully-connected
057: * "star" network formation:<pre>
058: * agent "Peer0" sends to "Peer1" and "Peer2"
059: * agent "Peer1" sends to "Peer0" and "Peer2"
060: * agent "Peer2" sends to "Peer0" and "Peer1"
061: * </pre>
062: * <p>
063: * Other topologies can be created, e.g. chains, rings, trees, etc. The
064: * only requirement is that, if agent "A" lists agent "B" as a <i>target</i>,
065: * then "B" must also list "A" as a <i>target</i>.
066: * <p>
067: * Each relay iteration waits until the prior iteration has completed, making
068: * it easy to identify bottlenecks and dropped/duplicate relays. Each agent
069: * logs<br>
070: * <code>Completed all <i>N</i> iterations</code><br>
071: * once all <b>maxIterations</b> have succeeded.
072: * <p>
073: * Plugin parameters:<dl>
074: * <dt><b>targets</b>=</dt><dd>
075: * Required comma-separated targets list, which supports range expressions
076: * for easy scalability testing. For example, "Peer[0..3]" is expanded
077: * to:<pre>
078: * Peer0, Peer1, Peer2</pre>
079: * Note that this agent must be listed as one of the targets, otherwise
080: * it will not send any relays. This "self" requirement makes it easy
081: * to enable/disable many agents using a global-replace in the configuration
082: * file.</dd>
083: * <dt><b>verbose</b>=true</dt><dd>
084: * Enable verbose SHOUT logging.</dd>
085: * <dt><b>bloatSize</b>=-1</dt><dd>
086: * Number of extra bytes to bloat each message, or -1 for no added size.</dd>
087: * <dt><b>maxIterations</b>=-1</dt><dd>
088: * Maximum number of relay iterations, or -1 for no limit.</dd>
089: * <dt><b>delayMillis</b>=5000</dt><dd>
090: * Added delay between iterations, in milliseconds, or -1 for for no
091: * extra delay.</dd>
092: * <dt><b>timeoutMillis</b>=5000</dt><dd>
093: * How long to wait into an iteration before logging a warning that
094: * the iteration is taking a long time, or -1 for no warnings.</dd>
095: * <dt><b>exitWhenDone</b>=false</dt><dd>
096: * Call {@link System#exit} when all <b>maxIterations</b> have been
097: * completed.</dd>
098: * </dl>
099: * <p>
100: */
101: public class MeshPlugin extends ComponentPlugin {
102:
103: private LoggingService log;
104: private UIDService uids;
105:
106: private List<String> targets;
107: private boolean verbose;
108: private int bloatSize;
109: private long maxIterations;
110: private long delayMillis;
111: private long timeoutMillis;
112: private boolean exitWhenDone;
113:
114: private long counter = -1;
115: private final Map<String, SimpleRelay> sent = new HashMap<String, SimpleRelay>();
116: private final Map<String, Long> received = new HashMap<String, Long>();
117: private int num_pending = 0;
118:
119: // time at end of our "load()" method
120: private long loadTime;
121: // time when we completed our second iteration
122: private long activeTime = -2;
123: // time when our delay-alarm will expire
124: private long delayTime = -1;
125: // time when we called "setTimeout()", or -1 if cancelled
126: private long setTime = -1;
127: // time when our timeout-alarm will expire
128: private long timeoutTime = -1;
129:
130: private IncrementalSubscription sub;
131: private TodoSubscription expiredAlarms;
132:
133: /** This method is called when the agent is created */
134: public void load() {
135: super .load();
136:
137: // get our required Cougaar services
138: log = (LoggingService) getServiceBroker().getService(this ,
139: LoggingService.class, null);
140: uids = (UIDService) getServiceBroker().getService(this ,
141: UIDService.class, null);
142:
143: // parse our plugin parameters
144: Arguments args = new Arguments(getParameters(), getClass());
145: String targets_string = args.getString("targets");
146: targets = parseTargets(targets_string, agentId.getAddress());
147: verbose = args.getBoolean("verbose", true);
148: bloatSize = args.getInt("bloatSize", -1);
149: maxIterations = args.getLong("maxIterations", -1);
150: delayMillis = args.getLong("delayMillis", 5000);
151: timeoutMillis = args.getLong("timeoutMillis", 30000);
152: exitWhenDone = args.getBoolean("exitWhenDone", false);
153:
154: if (verbose && log.isShoutEnabled()) {
155: log
156: .shout("Parsed " + targets.size() + " target"
157: + (targets.size() == 1 ? "" : "s") + ": "
158: + targets);
159: }
160:
161: loadTime = System.currentTimeMillis();
162: }
163:
164: /** This method is called when the agent starts. */
165: protected void setupSubscriptions() {
166:
167: if (targets.isEmpty()) {
168: // we're not in the targets set, so don't send anything
169: return;
170: }
171:
172: // initialize our received table
173: for (String target : targets) {
174: received.put(target, new Long(0));
175: }
176:
177: // subscribe to all relays sent to our agent
178: UnaryPredicate pred = new UnaryPredicate() {
179: public boolean execute(Object o) {
180: return ((o instanceof SimpleRelay) && (agentId
181: .equals(((SimpleRelay) o).getTarget())));
182: }
183: };
184: sub = (IncrementalSubscription) blackboard
185: .subscribe(new IncrementalSubscription(pred,
186: new HashSet()) {
187: // we need our added list to be in order!
188: protected Set createAddedSet() {
189: return new LinkedHashSet(5);
190: }
191: });
192:
193: // create a queue for expired alarms
194: expiredAlarms = (TodoSubscription) blackboard
195: .subscribe(new TodoSubscription("myAlarms"));
196:
197: if (blackboard.didRehydrate()) {
198: // restarting from an agent move or persistence snapshot
199: restoreState();
200: return;
201: }
202:
203: // send our first round of relays to our targets
204: sendNow();
205:
206: // when any target publishes a relay or one of our alarms fires,
207: // our "execute()" method will be called.
208: }
209:
210: /** This method is called whenever a subscription changes or alarm fires. */
211: protected void execute() {
212:
213: if (sub == null) {
214: // Our "execute()" is always called at least once, even if we
215: // don't have any subscriptions.
216: assert targets.isEmpty();
217: return;
218: }
219:
220: // check for expired alarms (delays & timeouts)
221: if (expiredAlarms.hasChanged()) {
222: for (Object oi : expiredAlarms.getAddedCollection()) {
223: handleAlarm((MyAlarm) oi);
224: }
225: }
226:
227: // check for incoming relays
228: if (sub.hasChanged()) {
229: for (Object oi : sub.getAddedCollection()) {
230: handleRelay((SimpleRelay) oi);
231: }
232: }
233: }
234:
235: // handle an expired alarm
236: private void handleAlarm(MyAlarm alarm) {
237: assert (delayMillis > 0 || timeoutMillis > 0);
238:
239: long now = System.currentTimeMillis();
240:
241: if (delayTime > 0 && delayTime <= now) {
242: // send our next round of relays to our targets
243: delayTime = -1;
244: sendNow();
245: }
246:
247: // check for an input timeout
248: if (timeoutTime > 0 && timeoutTime <= now) {
249: checkTimeout();
250: }
251: }
252:
253: // handle an incoming relay
254: private void handleRelay(SimpleRelay relay) {
255: String target = relay.getSource().getAddress();
256: Object new_obj = relay.getQuery();
257: if (new_obj instanceof Payload) {
258: new_obj = ((Payload) new_obj).getData();
259: }
260: Long new_value = (Long) new_obj;
261: long value = new_value.longValue();
262:
263: if (verbose && log.isShoutEnabled()) {
264: log.shout("Received " + value + " from " + target);
265: }
266:
267: if (value != (counter + 1) && value != (counter + 2)) {
268: log.error("Expecting " + (counter + 1) + " or "
269: + (counter + 2) + " from " + target + ", not "
270: + value + ", relay is " + relay);
271: return;
272: }
273:
274: Long old_obj = received.get(target);
275: if (old_obj == null) {
276: log.error("Unexpected relay from " + target + " " + relay);
277: return;
278: }
279: long old_value = old_obj.longValue();
280: if (value == (old_value + 1)) {
281: received.put(target, new_value);
282: } else {
283: // this is an error, unless we're restarting from an agent move or
284: // persistence snapshot.
285: log.warn("Unexpected value " + value + " from " + target
286: + ", expecting " + (old_value + 1) + ", relay is "
287: + relay + ". Was this agent moved or restarted?");
288: if (value > old_value) {
289: received.put(target, new_value);
290: }
291: }
292:
293: if (value != (counter + 1)) {
294: return;
295: }
296: num_pending--;
297: if (num_pending > 0) {
298: // keep waiting for more relays
299: return;
300: }
301:
302: cancelTimeout();
303:
304: // send next relay
305: if (delayMillis > 0) {
306: // set an alarm to call our "execute()" method in the future
307: sendLater();
308: } else {
309: // send our relay now
310: sendNow();
311: }
312: }
313:
314: /** Send our next relay iteration now */
315: private void sendNow() {
316: // record the timestamp of our second iteration
317: //
318: // we ignore the first iteration because it includes the naming and
319: // messaging startup costs.
320: if (activeTime < 0) {
321: if (activeTime < -1) {
322: activeTime = -1;
323: } else {
324: activeTime = System.currentTimeMillis();
325: }
326: }
327:
328: // increment counter
329: counter++;
330: if (maxIterations >= 0 && counter >= maxIterations) {
331: long now = System.currentTimeMillis();
332: log.shout("Completed all " + counter
333: + " iterations in an initial "
334: + (activeTime - loadTime) + " plus subsequent "
335: + (now - activeTime) + " milliseconds");
336: if (exitWhenDone) {
337: try {
338: System.exit(0);
339: } catch (Exception e) {
340: log.error("Unable to exit", e);
341: }
342: }
343: return;
344: }
345:
346: // delete old sent relays
347: for (SimpleRelay priorRelay : sent.values()) {
348: blackboard.publishRemove(priorRelay);
349: }
350: sent.clear();
351:
352: // send new relays
353: Object content = new Long(counter + 1);
354: if (bloatSize > 0) {
355: content = new Payload(content, bloatSize);
356: }
357: if (verbose && log.isShoutEnabled()) {
358: log.shout("Sending counter " + content + " to "
359: + targets.size() + " target"
360: + (targets.size() == 1 ? "" : "s"));
361: }
362: for (String target : targets) {
363: SimpleRelay relay = new SimpleRelaySource(uids.nextUID(),
364: agentId, MessageAddress.getMessageAddress(target),
365: content);
366:
367: sent.put(target, relay);
368:
369: blackboard.publishAdd(relay);
370: }
371:
372: updateNumPending();
373: if (num_pending <= 0) {
374: // if our delay is relatively large compared to the comms time, then
375: // all our inputs may have already arrived.
376: if (delayMillis > 0) {
377: sendLater();
378: } else {
379: log.error("None pending?");
380: }
381: return;
382: }
383:
384: // set timeout alarm
385: setTimeout();
386: }
387:
388: /** Send our next relay iteration after the non-zero delayMillis */
389: private void sendLater() {
390: if (verbose && log.isShoutEnabled()) {
391: log.shout("Will send counter " + (counter + 1) + " in "
392: + (delayMillis / 1000) + " seconds");
393: }
394: delayTime = System.currentTimeMillis() + delayMillis;
395: getAlarmService().addRealTimeAlarm(new MyAlarm(delayTime));
396: }
397:
398: private void updateNumPending() {
399: // update num_pending
400: num_pending = targets.size();
401: for (Long vi : received.values()) {
402: long value = vi.longValue();
403: if (value > counter) {
404: num_pending--;
405: }
406: }
407: }
408:
409: // The following use of alarms has been optimized!
410: //
411: // The naive solution is to create an alarm in "setTimeout()", cancel it in
412: // "cancelTimeout()", and "checkTimeout()" simply calls "handleTimeout()".
413: // The downside to this solution is that we expect our mesh iteration period
414: // to be *much* smaller than the timeout period, which will result in lots of
415: // created-then-cancelled alarms. This is wasteful.
416: //
417: // Instead, it's more efficient to keep an alarm around and not cancel it.
418: // This makes the "setTimeout()" and "cancelTimeout()" operations very fast.
419: // The minor downside is that the "checkTimeout()" method is a bit more
420: // complicated.
421: private void setTimeout() {
422: if (timeoutMillis <= 0)
423: return;
424: setTime = System.currentTimeMillis();
425: if (timeoutTime < 0) {
426: timeoutTime = setTime + timeoutMillis;
427: getAlarmService()
428: .addRealTimeAlarm(new MyAlarm(timeoutTime));
429: }
430: }
431:
432: private void cancelTimeout() {
433: if (timeoutMillis <= 0)
434: return;
435: setTime = -1;
436: }
437:
438: private void checkTimeout() {
439: assert timeoutMillis > 0;
440: long expirationTime = timeoutTime;
441: timeoutTime = -1;
442: if (setTime >= 0) {
443: long t = setTime + timeoutMillis;
444: if (expirationTime < t) {
445: timeoutTime = t;
446: getAlarmService().addRealTimeAlarm(
447: new MyAlarm(timeoutTime));
448: } else {
449: handleTimeout();
450: }
451: }
452: }
453:
454: private void handleTimeout() {
455: reportTimeout();
456: setTimeout();
457: }
458:
459: private void reportTimeout() {
460: if (!log.isWarnEnabled())
461: return;
462:
463: StringBuffer buf = new StringBuffer("Waiting for "
464: + num_pending + " of " + sent.size() + " relays {");
465: for (String target : targets) {
466: buf.append("\n ").append(target).append(" ");
467: long value = received.get(target).longValue();
468: buf.append(value);
469: if (value <= counter) {
470: buf.append(" PENDING");
471: }
472: }
473: buf.append("\n}");
474:
475: log.warn(buf.toString());
476: }
477:
478: private void restoreState() {
479: // figure out our counter by looking at our sent relays
480: UnaryPredicate pred = new UnaryPredicate() {
481: public boolean execute(Object o) {
482: return ((o instanceof SimpleRelay) && (agentId
483: .equals(((SimpleRelay) o).getSource())));
484: }
485: };
486: long sent_value = -1;
487: Collection sent_col = blackboard.query(pred);
488: for (Object si : sent_col) {
489: SimpleRelay relay = (SimpleRelay) si;
490: String target = relay.getTarget().getAddress();
491: Object obj = relay.getQuery();
492: if (obj instanceof Payload) {
493: obj = ((Payload) obj).getData();
494: }
495: long value = ((Long) obj).longValue();
496: if (verbose && log.isShoutEnabled()) {
497: log.shout("Sent " + value + " to " + target + " "
498: + relay);
499: }
500: if (value >= sent_value) {
501: sent_value = value;
502: }
503: }
504: counter = sent_value - 1;
505: if (log.isShoutEnabled()) {
506: log.shout("Restored counter at " + counter);
507: }
508:
509: // remove any old sent relays
510: for (Object si : sent_col) {
511: SimpleRelay relay = (SimpleRelay) si;
512: String target = relay.getTarget().getAddress();
513: Object obj = relay.getQuery();
514: if (obj instanceof Payload) {
515: obj = ((Payload) obj).getData();
516: }
517: long value = ((Long) obj).longValue();
518: if (value == (counter + 1)) {
519: sent.put(target, relay);
520: } else {
521: log.warn("Found stale relay to " + target
522: + " with value " + value
523: + " instead of expected " + (counter + 1)
524: + ", removing " + relay);
525: blackboard.publishRemove(relay);
526: }
527: }
528:
529: // figure out what we've received
530: for (Object oi : sub) {
531: SimpleRelay relay = (SimpleRelay) oi;
532: String target = relay.getSource().getAddress();
533: Object obj = relay.getQuery();
534: if (obj instanceof Payload) {
535: obj = ((Payload) obj).getData();
536: }
537: Long longV = (Long) obj;
538: long value = longV.longValue();
539: Long old_value = received.get(target);
540: if (verbose && log.isShoutEnabled()) {
541: log.shout("Received " + value + " from " + target
542: + " " + relay);
543: }
544: if (old_value == null || old_value.longValue() < value) {
545: received.put(target, longV);
546: }
547: }
548:
549: updateNumPending();
550: if (num_pending <= 0) {
551: // we must have been delaying between iterations.
552: if (delayMillis > 0) {
553: sendLater();
554: } else {
555: log
556: .error("Restore with zero delay found no pending relays?");
557: }
558: return;
559: }
560:
561: // we're still waiting for input, set timeout alarm
562: if (verbose && log.isShoutEnabled()) {
563: reportTimeout();
564: }
565: setTimeout();
566: }
567:
568: // parse our target list, e.g. "Peer[0..20]"
569: private static List<String> parseTargets(String s, String this Agent) {
570: if (s == null) {
571: throw new IllegalArgumentException("Must specify targets");
572: }
573: boolean containsThisAgent = false;
574: List<String> ret = new ArrayList<String>();
575: s = s.replace('\n', ' ');
576: String[] sa = s.split(",");
577: ret = new ArrayList<String>();
578: for (String si : sa) {
579: si = si.trim();
580: if (si.length() <= 0)
581: continue;
582: try {
583: int j = si.indexOf('[');
584: int k = (j >= 0 ? si.indexOf(']', j) : -1);
585: if (j <= 0 || k <= 0) {
586: // specific target name, e.g. "Foo"
587: String target = si;
588: if (target.equals(this Agent)) {
589: containsThisAgent = true;
590: } else {
591: ret.add(target);
592: }
593: continue;
594: }
595: // expand pattern, e.g. "X[0..3]Y" becomes "X0Y, X1Y, X2Y"
596: //
597: // could use a regex here
598: int q = si.indexOf("..", j);
599: if (q >= k)
600: q = -1;
601: int seq_begin;
602: int seq_end;
603: if (q < 0) {
604: String x = si.substring(j + 1, k).trim();
605: seq_begin = Integer.parseInt(x);
606: seq_end = seq_begin + 1;
607: } else {
608: String x = si.substring(j + 1, q).trim();
609: String y = si.substring(q + 2, k).trim();
610: seq_begin = Integer.parseInt(x);
611: seq_end = Integer.parseInt(y);
612: }
613: for (int index = seq_begin; index < seq_end; index++) {
614: String target = si.substring(0, j).trim() + index
615: + si.substring(k + 1).trim();
616: if (target.equals(this Agent)) {
617: containsThisAgent = true;
618: } else {
619: ret.add(target);
620: }
621: }
622: } catch (Exception e) {
623: throw new RuntimeException("Invalid target: " + si, e);
624: }
625: }
626: if (!containsThisAgent) {
627: ret = Collections.emptyList();
628: }
629: return ret;
630: }
631:
632: private class MyAlarm extends AlarmBase {
633: public MyAlarm(long futureTime) {
634: super (futureTime);
635: }
636:
637: // Put this alarm on the "expiredAlarms" queue and request an "execute()"
638: public void onExpire() {
639: expiredAlarms.add(this);
640: }
641: }
642: }
|