001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2007 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.mobility.plugin;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.Enumeration;
033: import java.util.Iterator;
034: import java.util.LinkedHashMap;
035: import java.util.List;
036: import java.util.Map;
037: import org.cougaar.core.agent.AgentContainer;
038: import org.cougaar.core.blackboard.IncrementalSubscription;
039: import org.cougaar.core.domain.Factory;
040: import org.cougaar.core.mobility.AddTicket;
041: import org.cougaar.core.mobility.ldm.AgentControl;
042: import org.cougaar.core.mobility.ldm.MobilityFactory;
043: import org.cougaar.core.mts.MessageAddress;
044: import org.cougaar.core.node.NodeControlService;
045: import org.cougaar.core.node.NodeIdentificationService;
046: import org.cougaar.core.plugin.ComponentPlugin;
047: import org.cougaar.core.service.DomainService;
048: import org.cougaar.core.service.LoggingService;
049: import org.cougaar.core.service.ThreadService;
050: import org.cougaar.core.service.UIDService;
051: import org.cougaar.core.util.UID;
052: import org.cougaar.util.UnaryPredicate;
053:
054: /**
055: * This plugin adds agents to the local or remote nodes, as specified by
056: * its plugin parameter list.
057: * <p>
058: * This is really just a souped-up version of the {@link
059: * AddAgentExamplePlugin}.
060: * <p>
061: * Each plugin parameter is a "key=value" pair, where the default key
062: * is "agent". The keys are:<dl>
063: * <dt>loopback=true</dt><dd>
064: * If the target is the local node and this plugin is loaded in the
065: * node agent, then use the local node's "addAgent" method instead of
066: * a mobility blackboard relay. This avoids the mobility domain if
067: * all agents are local.</dd>
068: * <dt>parallel</dt><dd>
069: * Spawn agents in parallel, instead of waiting for each add to
070: * complete before adding the next agent.</dd>
071: * <dt>agent</dt><dd>
072: * An add-agent command, see {@link #parseAgent}. This can
073: * simply be a list of agent names, e.g.:<pre>
074: * <argument>Foo</argument>
075: * <argument>Bar</argument>
076: * </pre></dd>
077: * </dl>
078: */
079: public class SpawnAgents extends ComponentPlugin {
080:
081: // if an add is to the local node, and we're in the node-agent,
082: // then use the local "addAgent" method instead of a relay.
083: protected static final boolean DEFAULT_LOOPBACK = true;
084: // spawn agents in parallel
085: protected static final boolean DEFAULT_PARALLEL = false;
086:
087: protected String localNode;
088: protected LoggingService log;
089: protected UIDService uidService;
090:
091: // these services may not be available and are not always required,
092: // so we'll only
093: private boolean setMobilityFactory = false;
094: private MobilityFactory _mobilityFactory;
095: private boolean setAgentContainer = false;
096: private AgentContainer _agentContainer;
097: private boolean setThreadService = false;
098: private ThreadService _threadService;
099:
100: // only use if loopback is enabled:
101: private final List completedLocalAdds = new ArrayList();
102:
103: protected IncrementalSubscription sub;
104: protected UID pluginId;
105:
106: // only used if !parallel
107: protected boolean addInProgress;
108:
109: protected boolean loopback = DEFAULT_LOOPBACK;
110: protected boolean parallel = DEFAULT_PARALLEL;
111: protected Map agents = Collections.EMPTY_MAP;
112:
113: public void setNodeIdentificationService(
114: NodeIdentificationService nis) {
115: localNode = (nis == null ? null : nis.getMessageAddress()
116: .getAddress());
117: }
118:
119: public void setLoggingService(LoggingService log) {
120: this .log = log;
121: }
122:
123: public void setUIDService(UIDService uidService) {
124: this .uidService = uidService;
125: }
126:
127: protected AgentContainer getAgentContainer() {
128: if (!setAgentContainer) {
129: setAgentContainer = true;
130: NodeControlService ncs = (NodeControlService) getServiceBroker()
131: .getService(this , NodeControlService.class, null);
132: if (ncs != null) {
133: _agentContainer = ncs.getRootContainer();
134: getServiceBroker().releaseService(this ,
135: NodeControlService.class, ncs);
136: }
137: }
138: return _agentContainer;
139: }
140:
141: protected ThreadService getThreadService() {
142: if (!setThreadService) {
143: setThreadService = true;
144: _threadService = (ThreadService) getServiceBroker()
145: .getService(this , ThreadService.class, null);
146: }
147: return _threadService;
148: }
149:
150: protected MobilityFactory getMobilityFactory() {
151: if (!setMobilityFactory) {
152: setMobilityFactory = true;
153: DomainService ds = (DomainService) getServiceBroker()
154: .getService(this , DomainService.class, null);
155: if (ds != null) {
156: _mobilityFactory = (MobilityFactory) ds
157: .getFactory("mobility");
158: getServiceBroker().releaseService(this ,
159: DomainService.class, ds);
160: }
161: }
162: return _mobilityFactory;
163: }
164:
165: public void setupSubscriptions() {
166: parseParameters(getParameters());
167:
168: // listen for responses to our add requests
169: //
170: // if all the agents are local then we'll never publish a relay, but
171: // we won't know that until we've processed the agents list
172: pluginId = uidService.nextUID();
173: sub = (IncrementalSubscription) blackboard
174: .subscribe(new AgentControlPredicate(pluginId));
175:
176: // TODO handle rehydration?
177:
178: if (parallel) {
179: // request all our agent adds in parallel
180: addAllAgents();
181: } else {
182: // add the first agent
183: Object o = getNextAgent();
184: if (o != null) {
185: addInProgress = true;
186: addAgent(o);
187: }
188: }
189: }
190:
191: public void execute() {
192: // check for completed local (loopback) adds
193: if (loopback) {
194: List l = takeCompletedLocalAdds();
195: for (int i = 0, n = l.size(); i < n; i++) {
196: CompletedLocalAdd cla = (CompletedLocalAdd) l.get(i);
197: String agent = cla.getAgent();
198: Throwable ex = cla.getException();
199:
200: completedAdd(agent, ex);
201: }
202: }
203:
204: // check for completed remote adds, which may include local adss if
205: // loopback is disabled or unavailable
206: for (Enumeration en = sub.getChangedList(); en
207: .hasMoreElements();) {
208: AgentControl ac = (AgentControl) en.nextElement();
209: int status = ac.getStatusCode();
210: if (status == AgentControl.NONE) {
211: // keep waiting, not sure why we were notified
212: continue;
213: }
214: AddTicket t = (AddTicket) ac.getAbstractTicket();
215: String agent = t.getMobileAgent().getAddress();
216: Throwable ex;
217: if (status == AgentControl.CREATED) {
218: ex = null;
219: } else {
220: ex = ac.getFailureStackTrace();
221: if (ex == null) {
222: ex = new RuntimeException("Unknown failure: " + ac);
223: }
224: }
225: // cleanup completed relay
226: blackboard.publishRemove(ac);
227:
228: completedAdd(agent, ex);
229: }
230: }
231:
232: protected void completedAdd(String agent, Throwable ex) {
233: // report status
234: if (ex == null) {
235: if (log.isInfoEnabled()) {
236: log.info("Added agent " + agent);
237: }
238: } else {
239: if (log.isErrorEnabled()) {
240: log.error("Unable to add agent " + agent, ex);
241: }
242: }
243:
244: if (parallel) {
245: // we've already submitted all our adds, so we don't expect any more
246: // agents. However, a subclass may override that method in some
247: // odd/clever way, so we'll check again anyways.
248: addAllAgents();
249: return;
250: }
251:
252: // record completed add
253: addInProgress = false;
254:
255: Object o = getNextAgent();
256: if (o == null) {
257: // no more agents to add
258: return;
259: }
260:
261: // add the next agent
262: addInProgress = true;
263: addAgent(o);
264: }
265:
266: /** Submit all adds in parallel. */
267: protected void addAllAgents() {
268: while (true) {
269: Object o = getNextAgent();
270: if (o == null)
271: break;
272: addAgent(o);
273: }
274: }
275:
276: /**
277: * Get the next add-agent command.
278: * <p>
279: * Note that a subclass can override this method for greater control
280: * over the iteration order and values.
281: * @see #parseAgents
282: */
283: protected Object getNextAgent() {
284: if (agents.isEmpty()) {
285: return null;
286: }
287: Iterator iter = agents.entrySet().iterator();
288: Map.Entry me = (Map.Entry) iter.next();
289: Object o = me.getValue();
290: if (o == null) {
291: o = me.getKey();
292: }
293: iter.remove();
294: return o;
295: }
296:
297: /** Request a non-blocking agent add */
298: protected void addAgent(Object o) {
299: // parse command
300: String agent;
301: String node = localNode;
302: Object options = null;
303: if (o instanceof String) {
304: agent = (String) o;
305: } else if (o instanceof AddCommand) {
306: AddCommand ac = (AddCommand) o;
307: agent = ac.getAgent();
308: node = ac.getNode();
309: options = ac.getOptions();
310: } else {
311: log.error("Invalid object type: " + o);
312: return;
313: }
314:
315: addAgent(agent, options, node);
316: }
317:
318: /** Request a non-blocking agent add */
319: protected void addAgent(String agent, Object options, String node) {
320: if (loopback && localNode.equals(node)) {
321: // attempt to add this local agent without using the mobility factory.
322: // This should only return false if this plugin can't obtain the
323: // agentContainer (e.g. we're not in the node agent).
324: if (addLocalAgent(agent, options)) {
325: return;
326: }
327: }
328:
329: // use the mobility factory to publish a blackboard add request
330: addRemoteAgent(agent, options, node);
331: }
332:
333: /** @return false if the caller must use #addRemoteAgent */
334: protected boolean addLocalAgent(final String agent,
335: final Object options) {
336: // get the node's container
337: final AgentContainer agentContainer = getAgentContainer();
338: if (agentContainer == null) {
339: // not in the node-agent, must use a mobility relay
340: return false;
341: }
342: // get the thread service
343: //
344: // We need to spawn a thread, since we're in a blackboard transaction.
345: // If we attempted to call "addLocalAgent" from our thread then we'd get
346: // "nested transaction" exceptions.
347: ThreadService threadService = getThreadService();
348: if (threadService == null) {
349: // unable to obtain thread service? must use a mobility relay
350: return false;
351: }
352: Runnable runner = new Runnable() {
353: public void run() {
354: // add the agent
355: Throwable ex = null;
356: try {
357: // TODO support options
358: agentContainer.addAgent(MessageAddress
359: .getMessageAddress(agent));
360: } catch (Throwable t) {
361: ex = t;
362: }
363:
364: // request an "execute()" cycle to process the response
365: synchronized (completedLocalAdds) {
366: completedLocalAdds.add(new CompletedLocalAdd(agent,
367: ex));
368: }
369: blackboard.signalClientActivity();
370: }
371: };
372: threadService.getThread(this , runner, "Add agent " + agent)
373: .start();
374:
375: // "add" is running in a background thread
376: return true;
377: }
378:
379: protected List takeCompletedLocalAdds() {
380: List ret = Collections.EMPTY_LIST;
381: synchronized (completedLocalAdds) {
382: if (!completedLocalAdds.isEmpty()) {
383: ret = new ArrayList(completedLocalAdds);
384: completedLocalAdds.clear();
385: }
386: }
387: return ret;
388: }
389:
390: /** Add an agent using a mobility relay */
391: protected void addRemoteAgent(String agent, Object options,
392: String node) {
393: // we need to use mobility relays
394: MobilityFactory mobilityFactory = getMobilityFactory();
395: if (mobilityFactory == null) {
396: throw new RuntimeException("Missing \"mobility\" domain");
397: }
398:
399: // create an add-ticket
400: // TODO support options
401: AddTicket addTicket = new AddTicket(uidService.nextUID(),
402: MessageAddress.getMessageAddress(agent), MessageAddress
403: .getMessageAddress(node));
404:
405: // create the blackboard object
406: AgentControl ac = mobilityFactory.createAgentControl(pluginId,
407: MessageAddress.getMessageAddress(node), addTicket);
408:
409: if (log.isInfoEnabled()) {
410: log.info("Adding agent " + agent + " to "
411: + (localNode.equals(node) ? "local" : "remote")
412: + " node " + node);
413: }
414:
415: // send it to the node
416: blackboard.publishAdd(ac);
417: }
418:
419: /**
420: * Parse the plugin parameters.
421: */
422: protected void parseParameters(Collection parameters) {
423: if (parameters.isEmpty()) {
424: return;
425: }
426: // extract agent strings, set local options (e.g. loopback)
427: List l = new ArrayList();
428: int i = 0;
429: for (Iterator iter = parameters.iterator(); iter.hasNext(); i++) {
430: String s = (String) iter.next();
431: int sep = s.indexOf('=');
432: String key;
433: String value;
434: if (sep < 0) {
435: key = "agent";
436: value = s;
437: } else {
438: key = s.substring(0, sep).trim();
439: value = s.substring(sep + 1).trim();
440: }
441: if ("agent".equals(key)) {
442: l.add(value);
443: } else {
444: parseParameter(key, value);
445: }
446: }
447: // parse agents map
448: parseAgents(l);
449: }
450:
451: /**
452: * Parse a non-agent parameter.
453: */
454: protected void parseParameter(String key, String value) {
455: if ("loopback".equals(key)) {
456: loopback = "true".equals(value);
457: } else if ("parallel".equals(key)) {
458: parallel = "true".equals(value);
459: } else {
460: throw new IllegalArgumentException("Invalid parameter: "
461: + key + "=" + value);
462: }
463: }
464:
465: /**
466: * A subclass can override this method to specify a dynamically generated
467: * parameter list, e.g.:<pre>
468: * protected void parseAgents(List dummy) {
469: * List l = new ArrayList();
470: * if (localNode.equals("A")) {
471: * l.add("foo");
472: * } else {
473: * l.add("bar");
474: * l.add("qux");
475: * }
476: * super.parseAgents(l);
477: * }
478: * </pre>
479: * <p>
480: * Also see {@link #getNextAgent}, which accesses the "agents" table
481: * created by this method.
482: */
483: protected void parseAgents(List l) {
484: agents = new LinkedHashMap();
485: for (int i = 0; i < l.size(); i++) {
486: Object o = parseAgent((String) l.get(i));
487: String agent;
488: if (o instanceof String) {
489: agent = (String) o;
490: } else if (o instanceof AddCommand) {
491: agent = ((AddCommand) o).getAgent();
492: } else if (o == null) {
493: continue;
494: } else {
495: throw new IllegalArgumentException("Invalid agent[" + i
496: + "]: " + o);
497: }
498: agents.put(agent, o);
499: }
500: if (log.isDebugEnabled()) {
501: log
502: .debug("parsed agents[" + agents.size() + "]: "
503: + agents);
504: }
505: }
506:
507: /**
508: * Parse an "agent=<i>STRING</i>" command specified in the plugin
509: * parameters.
510: * <p>
511: * An agent string is either an agent name, a name followed by an "@"
512: * and a node name, or either of the prior options followed by an " if "
513: * and a node name. This allows the client to specific all of the
514: * following simple commands:
515: * <pre>
516: * <!-- add agent A to the local node -->
517: * <argument>A</argument>
518: *
519: * <!-- add agent B to node X, which may be remote -->
520: * <argument>B @ X</argument>
521: *
522: * <!-- if this is node Y, then add agent C to the local node -->
523: * <argument>C if Y</argument>
524: *
525: * <!-- if this is node Z, then add agent D to node Q -->
526: * <argument>D @ Q if Z</argument>
527: * </pre>
528: *
529: * @return an agent name, an AddCommand, or null
530: */
531: protected Object parseAgent(String s) {
532: String value = s;
533: int sep = value.indexOf(" if ");
534: if (sep > 0) {
535: String test = value.substring(sep + " if ".length()).trim();
536: if (!localNode.equals(test)) {
537: if (log.isDebugEnabled()) {
538: log.debug("skipping " + value);
539: }
540: return null;
541: }
542: value = value.substring(0, sep).trim();
543: }
544: String agent = value;
545: String node = localNode;
546: sep = value.indexOf('@');
547: if (sep >= 0) {
548: agent = value.substring(0, sep).trim();
549: node = value.substring(sep + 1).trim();
550: }
551: if (node.equals(localNode) || node.length() == 0) {
552: return agent;
553: } else {
554: return new AddCommand(agent, node);
555: }
556: }
557:
558: protected static class AddCommand {
559: private final String agent;
560: private final String node;
561: private final Object options;
562:
563: public AddCommand(String agent, String node) {
564: this (agent, node, null);
565: }
566:
567: public AddCommand(String agent, String node, Object options) {
568: this .agent = agent;
569: this .node = node;
570: this .options = options;
571: }
572:
573: public String getAgent() {
574: return agent;
575: }
576:
577: public String getNode() {
578: return node;
579: }
580:
581: public Object getOptions() {
582: return options;
583: }
584:
585: public String toString() {
586: return agent + " @ " + node;
587: }
588: }
589:
590: protected static final class AgentControlPredicate implements
591: UnaryPredicate {
592: private final UID pluginId;
593:
594: public AgentControlPredicate(UID pluginId) {
595: this .pluginId = pluginId;
596: }
597:
598: public boolean execute(Object o) {
599: return (o instanceof AgentControl && pluginId
600: .equals(((AgentControl) o).getOwnerUID()));
601: }
602: }
603:
604: protected static class CompletedLocalAdd {
605: private final String agent;
606: private final Throwable ex;
607:
608: public CompletedLocalAdd(String agent, Throwable ex) {
609: this .agent = agent;
610: this .ex = ex;
611: }
612:
613: public String getAgent() {
614: return agent;
615: }
616:
617: public Throwable getException() {
618: return ex;
619: }
620:
621: public String toString() {
622: return "(completed-local-add agent=" + agent + " success="
623: + (ex == null) + ")";
624: }
625: }
626: }
|