001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.lib.aggagent.plugin;
027:
028: import java.util.Collection;
029: import java.util.Enumeration;
030: import java.util.Iterator;
031: import java.util.TimerTask;
032: import java.util.Vector;
033:
034: import org.cougaar.core.blackboard.IncrementalSubscription;
035: import org.cougaar.core.mts.MessageAddress;
036: import org.cougaar.core.plugin.ComponentPlugin;
037: import org.cougaar.core.service.LoggingService;
038: import org.cougaar.core.service.UIDService;
039: import org.cougaar.lib.aggagent.query.AggregationQuery;
040: import org.cougaar.lib.aggagent.query.QueryResultAdapter;
041: import org.cougaar.lib.aggagent.session.UpdateDelta;
042: import org.cougaar.lib.aggagent.util.InverseSax;
043: import org.cougaar.lib.aggagent.util.XmlUtils;
044: import org.cougaar.lib.aggagent.util.Enum.QueryType;
045: import org.cougaar.lib.aggagent.util.Enum.UpdateMethod;
046: import org.cougaar.util.UnaryPredicate;
047: import org.w3c.dom.Element;
048:
049: /**
050: * Receives aggregation requests in the form of QueryResultAdapter objects.
051: */
052: public class AggregationPlugin extends ComponentPlugin {
053: // Subscribe to all QueryResultAdapter objects
054: private IncrementalSubscription querySub;
055: private IncrementalSubscription messageSub;
056:
057: private static class QuerySeeker implements UnaryPredicate {
058: public boolean execute(Object o) {
059: return o instanceof QueryResultAdapter;
060: }
061: }
062:
063: /**
064: * A convenience method for creating IncrementalSubscriptions
065: */
066: protected IncrementalSubscription subscribeIncr(UnaryPredicate p) {
067: return (IncrementalSubscription) getBlackboardService()
068: .subscribe(p);
069: }
070:
071: protected MessageAddress me;
072:
073: public void setupSubscriptions() {
074: me = getAgentIdentifier();
075: querySub = subscribeIncr(new QuerySeeker());
076: messageSub = subscribeIncr(new MessageSeeker(true));
077: }
078:
079: public void execute() {
080:
081: if (log != null && log.isDebugEnabled())
082: log.debug("(" + me + ")AggPlugin: execute");
083:
084: checkNewMessages();
085: checkNewQueries();
086: checkUpdatedQueries();
087: checkRemovedQueries();
088: }
089:
090: private void checkNewMessages() {
091: // Only the changed messages are interesting.
092: // The old ones will be deleted.
093: // We need to ignore the ones that have *already* been deleted, though.
094: Collection removedARs = messageSub.getRemovedCollection();
095: for (Enumeration e = messageSub.getChangedList(); e
096: .hasMoreElements();) {
097: if (!removedARs.contains(e))
098: receiveMessage((AggRelay) e.nextElement());
099: }
100: }
101:
102: private void checkNewQueries() {
103: for (Enumeration e = querySub.getAddedList(); e
104: .hasMoreElements();) {
105: QueryResultAdapter qra = (QueryResultAdapter) e
106: .nextElement();
107: AggregationQuery aq = qra.getQuery();
108:
109: if (aq.getType() == QueryType.PERSISTENT) {
110: if (aq.getUpdateMethod() == UpdateMethod.PUSH) {
111: // create a push session on each of the source clusters listed
112: // in query
113: for (Enumeration sc = aq.getSourceClusters(); sc
114: .hasMoreElements();) {
115: String clusterString = (String) sc
116: .nextElement();
117: requestPushSession(qra.getID(), clusterString,
118: qra);
119: }
120: } else {
121: for (Enumeration sc = aq.getSourceClusters(); sc
122: .hasMoreElements();) {
123: String clusterString = (String) sc
124: .nextElement();
125: requestPullSession(qra.getID(), clusterString,
126: qra);
127: }
128: // set up the aggregator timer
129: if (qra.getQuery().getPullRate() >= 0) {
130: java.util.Timer pullTimer = new java.util.Timer();
131: long waitPeriod = (long) (qra.getQuery()
132: .getPullRate() * 1000);
133: pullTimer.scheduleAtFixedRate(
134: getTimerTask(qra), 0, waitPeriod);
135: qra.getQuery().setPullTimer(pullTimer);
136: }
137: }
138: } else {
139: // query each cluster
140: for (Enumeration sc = aq.getSourceClusters(); sc
141: .hasMoreElements();) {
142: String clusterString = (String) sc.nextElement();
143: queryCluster(clusterString, qra);
144: }
145: }
146: }
147: }
148:
149: private void checkUpdatedQueries() {
150: for (Enumeration e = querySub.getChangedList(); e
151: .hasMoreElements();) {
152: QueryResultAdapter qra = (QueryResultAdapter) e
153: .nextElement();
154: AggregationQuery aq = qra.getQuery();
155: String queryId = qra.getID();
156:
157: try {
158: // ignore update of transient queries
159: if (qra.getQuery().getType() == QueryType.PERSISTENT) {
160: Vector removedClusters = qra
161: .getAndResetRemovedClusters();
162: // remove the old cluster relays
163: if (removedClusters != null
164: && !removedClusters.isEmpty()) {
165: Iterator iter = messageSub.getCollection()
166: .iterator();
167: while (iter.hasNext()) {
168: AggRelay ar = (AggRelay) iter.next();
169: XMLMessage xmsg = (XMLMessage) ar
170: .getContent();
171: Element root = XmlUtils.parse(xmsg
172: .getText());
173: String this _query_id = root
174: .getAttribute("query_id");
175: Iterator targets = ar.getTargets()
176: .iterator();
177: while (targets.hasNext()) {
178: String this _cluster_id = targets.next()
179: .toString();
180: if (queryId.equals(this _query_id)
181: && removedClusters
182: .contains(this _cluster_id)) {
183: getBlackboardService()
184: .publishRemove(ar);
185: }
186: }
187: }
188: if (log != null && log.isDebugEnabled())
189: log.debug("(" + me
190: + ")Updating remote session "
191: + qra.getID());
192: }
193: Vector addedClusters = qra
194: .getAndResetAddedClusters();
195: // add the new cluster relays
196: if (addedClusters != null
197: && !addedClusters.isEmpty()) {
198: if (log != null && log.isDebugEnabled())
199: log.debug("(" + me
200: + ")Updating remote session "
201: + qra.getID());
202: Enumeration newClusters = addedClusters
203: .elements();
204: while (newClusters.hasMoreElements()) {
205: String clusterId = (String) newClusters
206: .nextElement();
207: if (aq.getUpdateMethod() == UpdateMethod.PUSH) {
208: requestPushSession(queryId, clusterId,
209: qra);
210: } else {
211: requestPullSession(queryId, clusterId,
212: qra);
213: }
214: }
215: }
216: }
217: } catch (Exception ioe) {
218: if (log != null && log.isErrorEnabled())
219: log.error("AggPlugin:(" + me
220: + "):error updating session" + ioe);
221: }
222: }
223: }
224:
225: private void checkRemovedQueries() {
226: for (Enumeration e = querySub.getRemovedList(); e
227: .hasMoreElements();) {
228: QueryResultAdapter qra = (QueryResultAdapter) e
229: .nextElement();
230: AggregationQuery aq = qra.getQuery();
231: String queryId = qra.getID();
232:
233: // ignore removal of transient queries
234: if (aq.getType() == QueryType.PERSISTENT) {
235: if (aq.getUpdateMethod() == UpdateMethod.PULL) {
236: // cancel local pull session
237: qra.getQuery().getPullTimer().cancel();
238: }
239:
240: // cancel session on each of the source clusters listed in query
241: if (log != null && log.isDebugEnabled())
242: log.debug("(" + me + ")Cancelling remote session "
243: + queryId);
244: cancelRemoteSession(queryId);
245: }
246: }
247: }
248:
249: private String frameRequestXml(String action, String qId,
250: String cId, boolean requester, AggregationQuery query) {
251: InverseSax request = new InverseSax();
252: request.addElement(action);
253: request.addAttribute("query_id", qId);
254: if (cId != null)
255: request.addAttribute("cluster_id", cId);
256: if (requester)
257: request.addAttribute("requester", getAgentIdentifier()
258: .toString());
259: if (query != null)
260: query.includeScriptXml(request);
261: request.endElement();
262: return request.toString();
263: }
264:
265: /**
266: * send query to cluster
267: */
268: private void queryCluster(String cId, QueryResultAdapter qra) {
269: sendMessage(createAggAddress(cId), frameRequestXml(
270: "transient_query_request", qra.getID(), cId, false, qra
271: .getQuery()));
272: }
273:
274: /**
275: * Send request to given Generic Plugin URL for a push session back to
276: * this cluster.
277: */
278: private void requestPushSession(String queryId, String clusterId,
279: QueryResultAdapter qra) {
280: sendMessage(createAggAddress(clusterId), frameRequestXml(
281: "push_request", queryId, null, true, qra.getQuery()));
282: if (log != null && log.isDebugEnabled())
283: log.debug("AggPlugin:(" + me
284: + "):requestPushSession: sent message");
285: }
286:
287: /**
288: * Send request to given Generic Plugin URL for a pull session back to
289: * this cluster.
290: */
291: private void requestPullSession(String queryId, String clusterId,
292: QueryResultAdapter qra) {
293: sendMessage(createAggAddress(clusterId), frameRequestXml(
294: "pull_request", queryId, null, true, qra.getQuery()));
295: }
296:
297: private TimerTask getTimerTask(QueryResultAdapter qra) {
298: return new PullTimerTask(qra);
299: }
300:
301: private void cancelRemoteSession(String queryId) {
302: // todo: this is horribly inefficient.
303: try {
304: Iterator iter = messageSub.getCollection().iterator();
305: while (iter.hasNext()) {
306: AggRelay ar = (AggRelay) iter.next();
307: XMLMessage xmsg = (XMLMessage) ar.getContent();
308: Element root = XmlUtils.parse(xmsg.getText());
309: String this _id = root.getAttribute("query_id");
310: if (queryId.equals(this _id)) {
311: getBlackboardService().publishRemove(ar);
312: if (log != null && log.isDebugEnabled())
313: log.debug("AggPlugin:(" + me
314: + "):canceled session at "
315: + ar.getTargets().iterator().next());
316: }
317: }
318: } catch (Exception ioe) {
319: if (log != null && log.isErrorEnabled())
320: log.error("AggPlugin:(" + me
321: + "):error canceling session" + ioe);
322: }
323: }
324:
325: /**
326: * Receive a message.
327: * Happens when a remote cluster sends me an update.
328: */
329: private void receiveMessage(AggRelay relay) {
330: try {
331: if (log != null && log.isDebugEnabled())
332: log.debug("AggPlugin:(" + me + "):receiveMessage");
333: XMLMessage xmsg = (XMLMessage) relay.getResponse();
334: Element root = XmlUtils.parse(xmsg.getText());
335: // String requestName = root.getNodeName();
336:
337: //
338: // Handle a response to one of my previous queries
339: //
340: UpdateDelta delta = new UpdateDelta(root);
341:
342: String updatedQuery = delta.getQueryId();
343: String updatedCluster = delta.getAgentId();
344:
345: if (log != null && log.isDebugEnabled())
346: log.debug("AggPlugin (" + me
347: + ")Received a message at "
348: + getAgentIdentifier()
349: + " --- Query update to :" + updatedQuery
350: + " from " + updatedCluster);
351:
352: // find query result adapter on blackboard
353: Iterator updatedQueries = getBlackboardService().query(
354: new QueryRAFinder(updatedQuery)).iterator();
355:
356: if (updatedQueries.hasNext()) {
357: QueryResultAdapter qra = (QueryResultAdapter) updatedQueries
358: .next();
359:
360: // update query result set based on reported changes
361: qra.updateResults(delta);
362:
363: // publish changes to blackboard
364: getBlackboardService().publishChange(qra);
365: // Am I done with thie relay?
366: if (qra.getQuery().getType()
367: .equals(QueryType.TRANSIENT))
368: getBlackboardService().publishRemove(relay);
369: } else {
370: if (log != null && log.isErrorEnabled())
371: log.error("AggPlugin: unable to find query ID: "
372: + updatedQuery);
373: }
374: } catch (Exception ex) {
375: System.err.println("Error receiving message");
376: ex.printStackTrace();
377: }
378: }
379:
380: /**
381: * Doesn't actually send a message, but published an object that
382: * causes a message to be sent.
383: */
384: protected void sendMessage(MessageAddress address, String message) {
385: if (log != null && log.isDebugEnabled())
386: log.debug("AggPlugins:(" + me + "):sendMessage from: "
387: + getAgentIdentifier() + " to "
388: + address.getAddress());
389: XMLMessage msg = new XMLMessage(message);
390: AggRelay relay = new AggRelay(getUIDService().nextUID(), me,
391: address, msg, null);
392: // I need to flag this relay as one that I created, so I don't try to service it, too.
393: relay.setLocal(true);
394: getBlackboardService().publishAdd(relay);
395: if (log != null && log.isDebugEnabled())
396: log.debug("AggPlugins:(" + me
397: + "):sendMessage: done publishized it");
398: }
399:
400: protected static final MessageAddress createAggAddress(
401: String agentName) {
402: return MessageAddress.getMessageAddress(agentName);
403: }
404:
405: private static class QueryRAFinder implements UnaryPredicate {
406: String queryId = null;
407:
408: public QueryRAFinder(String queryId) {
409: this .queryId = queryId;
410: }
411:
412: public boolean execute(Object o) {
413: if (o instanceof QueryResultAdapter) {
414: QueryResultAdapter qra = (QueryResultAdapter) o;
415: return qra.checkID(queryId);
416: }
417: return false;
418: }
419: }
420:
421: private class PullTimerTask extends TimerTask {
422: private QueryResultAdapter qra;
423:
424: public PullTimerTask(QueryResultAdapter qra) {
425: this .qra = qra;
426: }
427:
428: public void run() {
429: String reqStr = frameRequestXml("update_request", qra
430: .getID(), null, true, null);
431:
432: Enumeration sources = qra.getQuery().getSourceClusters();
433: while (sources.hasMoreElements())
434: sendMessage(createAggAddress((String) sources
435: .nextElement()), reqStr);
436: }
437: }
438:
439: protected LoggingService log = null;
440:
441: /** Holds value of property UIDService. */
442: private UIDService UIDService;
443:
444: public void setLoggingService(LoggingService ls) {
445: if ((ls == null) && (log != null) && log.isDebugEnabled())
446: log.debug("Logger (" + me + ")being reset to null");
447: log = ls;
448: if ((log != null) && log.isDebugEnabled())
449: log.debug("Logging (" + me + ")initialized");
450: }
451:
452: public LoggingService getLoggingService() {
453: return log;
454: }
455:
456: /** Getter for property UIDService.
457: * @return Value of property UIDService.
458: */
459: public UIDService getUIDService() {
460: return this .UIDService;
461: }
462:
463: /** Setter for property UIDService.
464: * @param UIDService New value of property UIDService.
465: */
466: public void setUIDService(UIDService UIDService) {
467: this.UIDService = UIDService;
468: }
469:
470: }
|