001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2006 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.demo.ping;
028:
029: import java.util.Iterator;
030:
031: import org.cougaar.bootstrap.SystemProperties;
032: import org.cougaar.core.agent.service.alarm.Alarm;
033: import org.cougaar.core.agent.service.alarm.AlarmBase;
034: import org.cougaar.core.blackboard.IncrementalSubscription;
035: import org.cougaar.core.blackboard.TodoSubscription;
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.core.plugin.ComponentPlugin;
038: import org.cougaar.core.relay.SimpleRelay;
039: import org.cougaar.core.relay.SimpleRelaySource;
040: import org.cougaar.core.service.LoggingService;
041: import org.cougaar.core.service.UIDService;
042: import org.cougaar.util.Arguments;
043: import org.cougaar.util.UnaryPredicate;
044:
045: /**
046: * This plugin is an example ping source that sends relays to a remote agent.
047: * <p>
048: * There can be multiple copies of this plugin in a single agent, but
049: * every {@link PingSender} must have a unique target. The target is
050: * specified as a plugin parameter:
051: * <dl>
052: * <dt>target=<i>String</i></dt>
053: * <dd>Required remote agent name. If the agent doesn't exist then we
054: * wait forever -- there's no alarm-based timeout in this plugin
055: * implementation.</dd></p>
056: *
057: * <dt>delayMillis=<i>long</i></dt>
058: * <dd>Delay milliseconds between relay iterations. Set the delay to zero
059: * to run the pings as fast as possible.</dd><p>
060: *
061: * <dt>verbose=<i>boolean</i></dt>
062: * <dd>Output SHOUT-level logging messages. This can also be disabled
063: * by modifying the Cougaar logging configuration to set:<pre>
064: * log4j.category.org.cougaar.demo.ping.PingSender=FATAL
065: * log4j.category.org.cougaar.demo.ping.PingReceiver=FATAL
066: * </pre>
067: * For simplicity we support this as a plugin parameter, so new users
068: * don't need to configure the logging service. If enabled, also
069: * consider turning off "+/-" message send/receive logging by
070: * setting:<pre>
071: * -Dorg.cougaar.core.agent.quiet=true
072: * </pre></dd><p>
073: * </dl>
074: *
075: * @property org.cougaar.demo.ping.delayMillis=5000
076: * PingSender delay between ping iterations, if not set as a plugin
077: * parameter.
078: *
079: * @property org.cougaar.demo.ping.verbose=true
080: * PingSender should output SHOUT-level logging messages, if not set as
081: * a plugin parameter.
082: *
083: * @see PingReceiver Required plugin for every agent that will receive
084: * ping relays.
085: *
086: * @see PingServlet Optional browser-based GUI.
087: */
088: public class PingSender extends ComponentPlugin {
089:
090: private static final long DEFAULT_DELAY_MILLIS = SystemProperties
091: .getLong("org.cougaar.demo.ping.delayMillis", 5000);
092:
093: private static final boolean DEFAULT_VERBOSE = SystemProperties
094: .getBoolean("org.cougaar.demo.ping.verbose", true);
095:
096: private LoggingService log;
097: private UIDService uids;
098:
099: private MessageAddress target;
100: private long delayMillis;
101: private boolean verbose;
102:
103: private IncrementalSubscription sub;
104: private TodoSubscription expiredAlarms;
105:
106: /** This method is called when the agent is created */
107: public void load() {
108: super .load();
109:
110: // Get our required Cougaar services
111: log = (LoggingService) getServiceBroker().getService(this ,
112: LoggingService.class, null);
113: uids = (UIDService) getServiceBroker().getService(this ,
114: UIDService.class, null);
115:
116: // Parse our plugin parameters
117: Arguments args = new Arguments(getParameters());
118: String target_name = args.getString("target", null);
119: target = MessageAddress.getMessageAddress(target_name);
120: if (target == null) {
121: throw new IllegalArgumentException("Must specify a target");
122: } else if (target.equals(agentId)) {
123: throw new IllegalArgumentException("Target matches self: "
124: + target);
125: }
126: delayMillis = args.getLong("delayMillis", DEFAULT_DELAY_MILLIS);
127: verbose = args.getBoolean("verbose", DEFAULT_VERBOSE);
128: }
129:
130: /** This method is called when the agent starts. */
131: protected void setupSubscriptions() {
132:
133: // Create a holder for alarms that have come due
134: //
135: // The "myAlarms" string is any arbitrary identifier, and would only be
136: // significant if we made more than one TodoSubscription instance.
137: if (delayMillis > 0) {
138: expiredAlarms = (TodoSubscription) blackboard
139: .subscribe(new TodoSubscription("myAlarms"));
140: }
141:
142: // Subscribe to all relays sent by our agent
143: sub = (IncrementalSubscription) blackboard
144: .subscribe(createPredicate());
145:
146: // Get our initial counter value, which is zero unless we're restarting
147: // from an agent move or persistence snapshot
148: int counter = getInitialCounter();
149:
150: // Send our first relay to our target
151: sendNow(null, new Integer(counter));
152:
153: // When our target publishes a response, our "execute()" method will
154: // be called.
155: }
156:
157: /** This method is called whenever a subscription changes. */
158: protected void execute() {
159: // Observe changed relays by looking at our subscription's change list
160: if (sub.hasChanged()) {
161: for (Iterator iter = sub.getChangedCollection().iterator(); iter
162: .hasNext();) {
163: SimpleRelay relay = (SimpleRelay) iter.next();
164: handleResponse(relay);
165: }
166: }
167:
168: // If we're using a delay, check to see if it is time to send the
169: // next ping iteration
170: if (delayMillis > 0 && expiredAlarms.hasChanged()) {
171: for (Iterator iter = expiredAlarms.getAddedCollection()
172: .iterator(); iter.hasNext();) {
173: MyAlarm alarm = (MyAlarm) iter.next();
174: handleAlarm(alarm);
175: }
176: }
177: }
178:
179: /** Create our subscription filter */
180: private UnaryPredicate createPredicate() {
181: // Match any relay sent by our agent and to our specific target,
182: // in case this agent contains multiple senders to different targets.
183: return new UnaryPredicate() {
184: public boolean execute(Object o) {
185: if (o instanceof SimpleRelay) {
186: SimpleRelay relay = (SimpleRelay) o;
187: if (agentId.equals(relay.getSource())
188: && target.equals(relay.getTarget())) {
189: return true;
190: }
191: }
192: return false;
193: }
194: };
195: }
196:
197: /** Get our initial ping iteration counter value */
198: private int getInitialCounter() {
199: // Check to see if we've already sent a ping, in case we're restarting
200: // from an agent move or persistence snapshot.
201: int ret = 0;
202: if (blackboard.didRehydrate()) {
203: // Get the counter from our sent ping, if any, then remove it
204: for (Iterator iter = sub.iterator(); iter.hasNext();) {
205: SimpleRelay relay = (SimpleRelay) iter.next();
206: ret = ((Integer) relay.getQuery()).intValue();
207: blackboard.publishRemove(relay);
208: }
209: if (verbose && log.isShoutEnabled()) {
210: log.shout("Resuming pings to " + target
211: + " at counter " + ret);
212: }
213: }
214: return ret;
215: }
216:
217: /** Handle a response to a ping relay that we sent */
218: private void handleResponse(SimpleRelay relay) {
219: // Print the target's response
220: if (verbose && log.isShoutEnabled()) {
221: log.shout("Received response " + relay.getReply()
222: + " from " + target);
223: }
224:
225: // Figure out our next content value
226: //
227: // For scalability testing we could make this a large byte array.
228: Integer old_content = (Integer) relay.getQuery();
229: Integer new_content = new Integer(old_content.intValue() + 1);
230:
231: if (delayMillis > 0) {
232: // Set an alarm to call our "execute()" method in the future
233: sendLater(relay, new_content);
234: } else {
235: // Send our relay now
236: sendNow(relay, new_content);
237: }
238: }
239:
240: /**
241: * Wake up from an alarm to send our next relay iteration (only use if
242: * delayMillis is greater than zero),
243: */
244: private void handleAlarm(MyAlarm alarm) {
245: // Send our next relay iteration to the target
246: SimpleRelay priorRelay = alarm.getPriorRelay();
247: Object content = alarm.getContent();
248: sendNow(priorRelay, content);
249: }
250:
251: /** Send our next relay iteration now */
252: private void sendNow(SimpleRelay priorRelay, Object content) {
253: if (priorRelay != null) {
254: // Remove query both locally and at the remote target, to cleanup
255: // the blackboard.
256: blackboard.publishRemove(priorRelay);
257: }
258:
259: // Send a new relay to the target
260: SimpleRelay relay = new SimpleRelaySource(uids.nextUID(),
261: agentId, target, content);
262: if (verbose && log.isShoutEnabled()) {
263: log.shout("Sending ping " + content + " to " + target);
264: }
265: blackboard.publishAdd(relay);
266: }
267:
268: /** Send our next relay iteration after the non-zero delayMillis */
269: private void sendLater(SimpleRelay priorRelay, Object content) {
270: // Set an alarm to call our "execute()" method in the future.
271: //
272: // An asynchronous alarm is more efficient and scalable than calling
273: // a blocking "Thread.sleep(delayMillis)", since it doesn't tie up a
274: // pooled Cougaar thread. By default, a Cougaar Node (JVM) is
275: // configured to have a limit of 30 pooled threads.
276: //
277: // Instead of removing the relay now, we hold onto it until the alarm
278: // is due. This allows the PingServlet to see the old relay on
279: // blackboard during our delay time.
280: if (verbose && log.isShoutEnabled()) {
281: log.shout("Will send ping " + content + " to " + target
282: + " in " + (delayMillis / 1000) + " seconds");
283: }
284: long futureTime = System.currentTimeMillis() + delayMillis;
285: Alarm alarm = new MyAlarm(priorRelay, content, futureTime);
286: getAlarmService().addRealTimeAlarm(alarm);
287: }
288:
289: /** An alarm that we use to wake us up after the delayMillis */
290: private class MyAlarm extends AlarmBase {
291: private SimpleRelay priorRelay;
292: private Object content;
293:
294: public MyAlarm(SimpleRelay priorRelay, Object content,
295: long futureTime) {
296: super (futureTime);
297: this .priorRelay = priorRelay;
298: this .content = content;
299: }
300:
301: public SimpleRelay getPriorRelay() {
302: return priorRelay;
303: }
304:
305: public Object getContent() {
306: return content;
307: }
308:
309: // Put this alarm on the "expiredAlarms" queue and request an "execute()"
310: public void onExpire() {
311: expiredAlarms.add(this);
312: }
313: }
314: }
|