001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.lib.aggagent.plugin;
027:
028: import java.util.Collection;
029: import java.util.Enumeration;
030: import java.util.HashMap;
031: import java.util.Iterator;
032:
033: import org.cougaar.core.blackboard.IncrementalSubscription;
034: import org.cougaar.core.mts.MessageAddress;
035: import org.cougaar.core.plugin.ComponentPlugin;
036: import org.cougaar.core.service.LoggingService;
037: import org.cougaar.lib.aggagent.query.ScriptSpec;
038: import org.cougaar.lib.aggagent.session.IncrementFormat;
039: import org.cougaar.lib.aggagent.session.RemoteBlackboardSubscription;
040: import org.cougaar.lib.aggagent.session.RemoteSession;
041: import org.cougaar.lib.aggagent.session.SubscriptionAccess;
042: import org.cougaar.lib.aggagent.session.SubscriptionWrapper;
043: import org.cougaar.lib.aggagent.session.UpdateDelta;
044: import org.cougaar.lib.aggagent.util.XmlUtils;
045: import org.cougaar.util.UnaryPredicate;
046: import org.w3c.dom.Element;
047:
048: /**
049: * This Plugin services remote subscription requests. It sends data back to an aggregation
050: * as requested. It depends on the presence of the AggDomain.
051: */
052: public class RemoteSubscriptionPlugin extends ComponentPlugin {
053: private boolean debug;
054: private Object lock = new Object();
055: private IncrementalSubscription messageSub;
056: protected MessageAddress me;
057:
058: public void setupSubscriptions() {
059: me = getAgentIdentifier();
060: messageSub = subscribeIncr(new MessageSeeker(false));
061:
062: // Get the list of queries that already exist. We need to rehandle these
063: Collection currARs = getBlackboardService().query(
064: new MessageSeeker(false));
065: if (!currARs.isEmpty()) {
066: AggRelay relay = (AggRelay) currARs.iterator().next();
067: receiveMessage(relay);
068: }
069: }
070:
071: public void execute() {
072:
073: if (log != null && log.isDebugEnabled())
074: log.debug("RemotePlugin:(" + me + ") Nmessages= "
075: + messageSub.getCollection().size());
076:
077: // process new messages
078: for (Enumeration e = messageSub.getAddedList(); e
079: .hasMoreElements();) {
080: receiveMessage((AggRelay) e.nextElement());
081: }
082:
083: // process removed sessions
084: for (Enumeration e = messageSub.getRemovedList(); e
085: .hasMoreElements();) {
086: cancelSession((AggRelay) e.nextElement());
087: }
088:
089: // process changed subscriptions
090: synchronized (lock) {
091: Iterator iter = queryMap.keySet().iterator();
092: while (iter.hasNext()) {
093: IncrementalSubscription sub = (IncrementalSubscription) iter
094: .next();
095: if (sub.hasChanged())
096: ((BBSession) queryMap.get(sub))
097: .subscriptionChanged();
098: }
099: }
100: }
101:
102: /**
103: * Receive a message.
104: * Happens when an agg agent sends me a message.
105: */
106: private void receiveMessage(AggRelay relay) {
107: try {
108: XMLMessage xmsg = (XMLMessage) relay.getContent();
109: if (log != null && log.isDebugEnabled())
110: log.debug("RemotePlugin:(" + me + ") receiveMessage "
111: + xmsg);
112:
113: Element root = XmlUtils.parse(xmsg.getText());
114: String requestName = root.getNodeName();
115: if (log != null && log.isDebugEnabled())
116: log.debug("RemotePlugin:(" + me + ") Got message: "
117: + requestName + ":" + root.toString());
118:
119: if (requestName.equals("transient_query_request")) {
120: transientQuery(root, relay);
121: } else if (requestName.equals("push_request")) {
122: createPushSession(root, relay);
123: } else if (requestName.equals("update_request")) {
124: returnUpdate(root);
125: } else if (requestName.equals("pull_request")) {
126: createPullSession(root, relay);
127: }
128: } catch (Exception ex) {
129: ex.printStackTrace();
130: }
131: }
132:
133: private void transientQuery(Element root, AggRelay relay)
134: throws Exception {
135: UnaryPredicate objectSeeker = ScriptSpec
136: .makeUnaryPredicate(XmlUtils.getChildElement(root,
137: "unary_predicate"));
138: if (objectSeeker == null)
139: throw new Exception("Could not create unary predicate");
140:
141: IncrementFormat formatter = ScriptSpec
142: .makeIncrementFormat(XmlUtils.getChildElement(root,
143: "xml_encoder"));
144: if (formatter == null)
145: throw new Exception("Could not create formatter");
146:
147: RemoteBlackboardSubscription tempSubscription = new RemoteBlackboardSubscription(
148: getBlackboardService(), objectSeeker, true);
149:
150: UpdateDelta del = new UpdateDelta(root
151: .getAttribute("cluster_id"), root
152: .getAttribute("query_id"), "");
153: // Use xml encoder to encode data from blackboard
154: tempSubscription.open();
155: try {
156: formatter.encode(del, tempSubscription);
157: } catch (Throwable err) {
158: if (err instanceof ThreadDeath)
159: throw (ThreadDeath) err;
160: del.setErrorReport(err);
161: }
162: tempSubscription.close();
163:
164: // Send response message
165: sendMessage(relay, del.toXml());
166: }
167:
168: private void createPushSession(Element root, AggRelay relay)
169: throws Exception {
170: String queryId = root.getAttribute("query_id");
171: // String requester = root.getAttribute("requester");
172:
173: UnaryPredicate seeker = ScriptSpec.makeUnaryPredicate(XmlUtils
174: .getChildElement(root, "unary_predicate"));
175: if (seeker == null)
176: throw new Exception("Could not create unary predicate");
177:
178: IncrementFormat formatter = ScriptSpec
179: .makeIncrementFormat(XmlUtils.getChildElement(root,
180: "xml_encoder"));
181: if (formatter == null)
182: throw new Exception("Could not create formatter");
183:
184: new RemotePushSession(String.valueOf(idCounter++), queryId,
185: formatter, relay, seeker);
186: }
187:
188: private int idCounter = 0;
189: private HashMap queryMap = new HashMap();
190:
191: // "BB" stands for "Blackboard". This is the abstract base class for the
192: // RemoteSession implementations used by this Plugin. It adds the ability
193: // to cancel, to respond to subscription events from the host agent, and send
194: // updates via COUGAAR messaging (all abstractly).
195: private abstract class BBSession extends RemoteSession {
196: protected AggRelay relay;
197:
198: protected BBSession(String k, String q, IncrementFormat f,
199: AggRelay r) {
200: super (k, q, f);
201: setAgentId(getAgentIdentifier().toString());
202: relay = r;
203: }
204:
205: public abstract void cancel();
206:
207: public abstract void subscriptionChanged();
208:
209: public abstract void pushUpdate();
210: }
211:
212: // This is the implementation of RemoteSession used for the PUSH method. It
213: // always sends notification immediately whenever the managed Subscription
214: // is updated by the host agent.
215: private class RemotePushSession extends BBSession {
216: private SubscriptionAccess data = null;
217: private IncrementalSubscription rawData = null;
218:
219: public RemotePushSession(String k, String q, IncrementFormat f,
220: AggRelay r, UnaryPredicate p) {
221: super (k, q, f, r);
222: synchronized (lock) {
223: rawData = subscribeIncr(new ErrorTrapPredicate(p));
224: data = new SubscriptionWrapper(rawData);
225: queryMap.put(rawData, this );
226: }
227: }
228:
229: public void cancel() {
230: synchronized (lock) {
231: queryMap.remove(rawData);
232: getBlackboardService().unsubscribe(rawData);
233: }
234: }
235:
236: public void subscriptionChanged() {
237: pushUpdate();
238: }
239:
240: public SubscriptionAccess getData() {
241: return data;
242: }
243:
244: public void pushUpdate() {
245: if (log != null && log.isDebugEnabled())
246: log.debug("Updating session to agg(" + me + "): "
247: + getQueryId());
248: sendMessage(relay, createUpdateDelta().toXml());
249: }
250: }
251:
252: /**
253: * Doesn't actually send a message, but updates an object that
254: * causes a message to be sent.
255: */
256: protected void sendMessage(AggRelay relay, String message) {
257: if (log != null && log.isDebugEnabled())
258: log.debug("RemoteSubPlugins:(" + me
259: + "):sendMessage from: " + getAgentIdentifier()
260: + " to " + relay.getSource());
261: XMLMessage msg = new XMLMessage(message);
262: relay.updateResponse(me, msg);
263: getBlackboardService().publishChange(relay);
264: if (log != null && log.isDebugEnabled())
265: log.debug("RemoteSubPlugins:(" + me
266: + "):sendMessage: done publish changed it");
267: }
268:
269: // This is the implementation of RemoteSession used for the PULL method. It
270: // uses the RemoteBlackboardSubscription class to defer event notification
271: // until requested by the client.
272: private class RemotePullSession extends BBSession {
273: private RemoteBlackboardSubscription rbs;
274:
275: public RemotePullSession(String k, String q, IncrementFormat f,
276: AggRelay r, UnaryPredicate p) {
277: super (k, q, f, r);
278: synchronized (lock) {
279: rbs = new RemoteBlackboardSubscription(
280: getBlackboardService(), new ErrorTrapPredicate(
281: p));
282: queryMap.put(rbs.getSubscription(), this );
283: }
284: }
285:
286: public void pushUpdate() {
287: if (log != null && log.isDebugEnabled())
288: log.debug("Updating session to agg(" + me + "): "
289: + getQueryId());
290: rbs.open();
291: UpdateDelta del = createUpdateDelta();
292: rbs.close();
293: sendMessage(relay, del.toXml());
294: }
295:
296: public void cancel() {
297: synchronized (lock) {
298: queryMap.remove(rbs.getSubscription());
299: rbs.shutDown();
300: }
301: }
302:
303: public void subscriptionChanged() {
304: rbs.subscriptionChanged();
305: }
306:
307: public SubscriptionAccess getData() {
308: return rbs;
309: }
310: }
311:
312: private void cancelSession(Element root) throws Exception {
313: String qId = root.getAttribute("query_id");
314: BBSession match = findSessionById(qId);
315: if (match != null)
316: match.cancel();
317: else {
318: String type = root.getNodeName();
319: if (type.equals("pull_request")
320: || type.equals("push_request"))
321: if (log != null && log.isWarnEnabled())
322: log.warn("Error cancelling session (" + me + ")"
323: + qId + " at "
324: + getAgentIdentifier().getAddress());
325: }
326: }
327:
328: private void cancelSession(AggRelay relay) {
329: try {
330: XMLMessage xmsg = (XMLMessage) relay.getContent();
331: if (log != null && log.isDebugEnabled())
332: log.debug("RemotePlugin:(" + me + ") relay deleted "
333: + xmsg);
334:
335: Element root = XmlUtils.parse(xmsg.getText());
336: cancelSession(root);
337: } catch (Exception ex) {
338: if (log != null && log.isErrorEnabled())
339: log.error("RemotePlugin:(" + me
340: + ") error deleting relay: " + ex);
341: }
342:
343: }
344:
345: private BBSession findSessionById(String id) {
346: Iterator iter = queryMap.values().iterator();
347: BBSession found = null;
348: while (iter.hasNext()) {
349: BBSession bbs = (BBSession) iter.next();
350: if (bbs.getQueryId().equals(id)) {
351: found = bbs;
352: break;
353: }
354: }
355: return found;
356: }
357:
358: private void createPullSession(Element root, AggRelay relay)
359: throws Exception {
360: String queryId = root.getAttribute("query_id");
361: // String requester = root.getAttribute("requester");
362:
363: UnaryPredicate seeker = ScriptSpec.makeUnaryPredicate(XmlUtils
364: .getChildElement(root, "unary_predicate"));
365: if (seeker == null)
366: throw new Exception("Could not create unary predicate");
367:
368: IncrementFormat formatter = ScriptSpec
369: .makeIncrementFormat(XmlUtils.getChildElement(root,
370: "xml_encoder"));
371: if (formatter == null)
372: throw new Exception("Could not create formatter");
373:
374: new RemotePullSession(String.valueOf(idCounter++), queryId,
375: formatter, relay, seeker);
376: if (log != null && log.isDebugEnabled())
377: log.debug("Pull session created(" + me + ")");
378: }
379:
380: private void returnUpdate(Element root) throws Exception {
381: String qId = root.getAttribute("query_id");
382: String requester = root.getAttribute("requester");
383:
384: BBSession bbs = findSessionById(qId);
385: if (bbs != null)
386: bbs.pushUpdate();
387: else if (log != null && log.isWarnEnabled())
388: log.warn("Error: (" + me
389: + ") query not found while updating " + qId
390: + " for " + requester);
391: }
392:
393: protected LoggingService log = null;
394:
395: public void setLoggingService(LoggingService ls) {
396: if ((ls == null) && (log != null) && log.isDebugEnabled())
397: log.debug("Logger (" + me + ")being reset to null");
398: log = ls;
399: if ((log != null) && log.isDebugEnabled())
400: log.debug("Logging (" + me + ")initialized");
401: }
402:
403: public LoggingService getLoggingService() {
404: return log;
405: }
406:
407: protected IncrementalSubscription subscribeIncr(UnaryPredicate p) {
408: return (IncrementalSubscription) getBlackboardService()
409: .subscribe(p);
410: }
411:
412: }
|