001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.relay;
028:
029: import java.util.Collection;
030: import java.util.Enumeration;
031: import java.util.Iterator;
032:
033: import org.cougaar.core.blackboard.IncrementalSubscription;
034: import org.cougaar.core.blackboard.Subscription;
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.logging.LoggingServiceWithPrefix;
037: import org.cougaar.core.mts.MessageAddress;
038: import org.cougaar.core.plugin.ComponentPlugin;
039: import org.cougaar.core.service.LoggingService;
040: import org.cougaar.core.service.UIDService;
041: import org.cougaar.core.util.UID;
042: import org.cougaar.util.UnaryPredicate;
043:
044: /**
045: * This component is an example {@link SimpleRelay} client, which
046: * both sends relays and replies to them.
047: * <p>
048: * To use, add this component to an agent and specify a target,
049: * for example in "AgentA" with a target of "AgentB":<pre>
050: * <component
051: * name='org.cougaar.core.relay.SimpleRelayExample(target=AgentB)'
052: * class='org.cougaar.core.relay.SimpleRelayExample'
053: * priority='COMPONENT'
054: * insertionpoint='Node.AgentManager.Agent.PluginManager.Plugin'>
055: * <argument>target=AgentB>/argument>
056: * </component>
057: * </pre>
058: * In the target agent add the component without a target argument:<pre>
059: * <component
060: * name='org.cougaar.core.relay.SimpleRelayExample(target=AgentB)'
061: * class='org.cougaar.core.relay.SimpleRelayExample'
062: * priority='COMPONENT'
063: * insertionpoint='Node.AgentManager.Agent.PluginManager.Plugin'/>
064: * </pre>
065: * You should see output similar to the following, which excludes
066: * logging timestamps and other details:<pre>
067: * .. AgentA: Sending (.. query=ping reply=null)
068: * .. AgentB: Reply (.. query=ping reply=echo-ping)
069: * .. AgentA: Received (.. query=ping reply=echo-ping)
070: * </pre>
071: * <p>
072: * It would be straight-forward to extend this example to a more
073: * general remote procedure call (<u>RPC</u>) utility: the query
074: * specifies a String "method" name and Object[] parameters, and
075: * the reply is either a Throwable or a non-error value, plus a
076: * wrapper if the non-error value is null or a Throwable. As in
077: * RMI, the parameters and return value must be Serializable and
078: * treated as immutable.
079: */
080: public class SimpleRelayExample extends ComponentPlugin {
081:
082: private LoggingService log;
083: private UIDService uids;
084:
085: private IncrementalSubscription sub;
086:
087: public void load() {
088: super .load();
089:
090: // get services
091: ServiceBroker sb = getServiceBroker();
092: log = (LoggingService) sb.getService(this ,
093: LoggingService.class, null);
094: uids = (UIDService) sb.getService(this , UIDService.class, null);
095:
096: // prefix all logging calls with our agent name
097: log = LoggingServiceWithPrefix.add(log, agentId + ": ");
098:
099: if (log.isDebugEnabled()) {
100: log.debug("loaded");
101: }
102: }
103:
104: protected void setupSubscriptions() {
105: if (log.isDebugEnabled()) {
106: log.debug("setupSubscriptions");
107: }
108:
109: // create relay subscription
110: sub = (IncrementalSubscription) blackboard
111: .subscribe(new MyPred(agentId));
112:
113: // send relays
114: for (Iterator iter = getParameters().iterator(); iter.hasNext();) {
115: String s = (String) iter.next();
116: if (!s.startsWith("target=")) {
117: continue;
118: }
119: String target_name = s.substring("target=".length());
120: MessageAddress target = MessageAddress
121: .getMessageAddress(target_name);
122: if (agentId.equals(target)) {
123: if (log.isWarnEnabled()) {
124: log.warn("Ignoring target that matches self: "
125: + target);
126: }
127: continue;
128: }
129: UID uid = uids.nextUID();
130: Object query = "ping";
131: SimpleRelay sr = new SimpleRelaySource(uid, agentId,
132: target, query);
133: if (log.isShoutEnabled()) {
134: log.shout("Sending " + sr);
135: }
136: blackboard.publishAdd(sr);
137: }
138: }
139:
140: protected void execute() {
141: if (log.isDebugEnabled()) {
142: log.debug("execute");
143: }
144:
145: if (!sub.hasChanged()) {
146: // usually never happens, since the only reason to execute
147: // is a subscription change
148: return;
149: }
150:
151: // observe added relays
152: for (Enumeration en = sub.getAddedList(); en.hasMoreElements();) {
153: SimpleRelay sr = (SimpleRelay) en.nextElement();
154: if (log.isDebugEnabled()) {
155: log.debug("observe added " + sr);
156: }
157: if (agentId.equals(sr.getTarget())) {
158: // send back reply
159: sr.setReply("echo-" + sr.getQuery());
160: if (log.isShoutEnabled()) {
161: log.shout("Reply " + sr);
162: }
163: blackboard.publishChange(sr);
164: } else {
165: // ignore relays we sent
166: }
167: }
168:
169: // observe changed relays
170: for (Enumeration en = sub.getChangedList(); en
171: .hasMoreElements();) {
172: SimpleRelay sr = (SimpleRelay) en.nextElement();
173: if (log.isDebugEnabled()) {
174: log.debug("observe changed " + sr);
175: }
176: if (agentId.equals(sr.getSource())) {
177: // got back answer
178: if (log.isShoutEnabled()) {
179: log.shout("Received " + sr);
180: }
181: // remove query both locally and at the remote target.
182: //
183: // this is optional, but it's a good idea to clean up and
184: // free some memory.
185: blackboard.publishRemove(sr);
186: } else {
187: // ignore our reply
188: }
189: }
190:
191: if (log.isDebugEnabled()) {
192: // removed relays
193: for (Enumeration en = sub.getRemovedList(); en
194: .hasMoreElements();) {
195: SimpleRelay sr = (SimpleRelay) en.nextElement();
196: log.debug("observe removed " + sr);
197: }
198: }
199: }
200:
201: /**
202: * My subscription predicate, which matches SimpleRelays where my
203: * local address matches either the source or target.
204: */
205: private static class MyPred implements UnaryPredicate {
206: private final MessageAddress agentId;
207:
208: public MyPred(MessageAddress agentId) {
209: this .agentId = agentId;
210: }
211:
212: public boolean execute(Object o) {
213: if (o instanceof SimpleRelay) {
214: SimpleRelay sr = (SimpleRelay) o;
215: return (agentId.equals(sr.getSource()) || agentId
216: .equals(sr.getTarget()));
217: }
218: return false;
219: }
220: }
221: }
|