001: /*
002: * <copyright>
003: *
004: * Copyright 2003-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.lib.aggagent.servlet;
027:
028: import java.io.PrintWriter;
029: import java.util.Collection;
030: import java.util.Enumeration;
031: import java.util.Iterator;
032: import java.util.LinkedList;
033: import java.util.Set;
034:
035: import javax.servlet.http.HttpServletRequest;
036:
037: import org.cougaar.core.blackboard.IncrementalSubscription;
038: import org.cougaar.core.blackboard.Subscription;
039: import org.cougaar.core.service.BlackboardService;
040: import org.cougaar.core.service.UIDService;
041: import org.cougaar.lib.aggagent.query.AggregationQuery;
042: import org.cougaar.lib.aggagent.query.AggregationResultSet;
043: import org.cougaar.lib.aggagent.query.Alert;
044: import org.cougaar.lib.aggagent.query.QueryResultAdapter;
045: import org.cougaar.util.UnaryPredicate;
046:
047: public abstract class AggregationServletInterface {
048: private BlackboardService blackboard;
049: private SubscriptionMonitorSupport subscriptionMonitorSupport;
050: private long timeoutDefault = 0; // this can be overridden by argument to waitForAndReturnResults
051: private UIDService uidService = null;
052:
053: public AggregationServletInterface(BlackboardService blackboard,
054: SubscriptionMonitorSupport subscriptionMonitorSupport,
055: UIDService uidService) {
056: this .blackboard = blackboard;
057: this .subscriptionMonitorSupport = subscriptionMonitorSupport;
058: this .uidService = uidService;
059: String timeoutStr = System
060: .getProperty("org.cougaar.lib.aggagent.timeout");
061: if (timeoutStr != null) {
062: try {
063: timeoutDefault = Long.parseLong(timeoutStr);
064: } catch (NumberFormatException nfe) {
065: System.err
066: .println("WARNING: Received invalid number for org.cougaar.lib.aggagent.timeout: "
067: + timeoutStr);
068: timeoutDefault = 0;
069: }
070: }
071: }
072:
073: protected UIDService getUIDService() {
074: return uidService;
075: }
076:
077: public abstract void handleRequest(PrintWriter out,
078: HttpServletRequest request);
079:
080: private static abstract class ClassSeeker implements UnaryPredicate {
081: private Class targetClass;
082: private Collection identifiers = null;
083:
084: public ClassSeeker(Class targetClass) {
085: this .targetClass = targetClass;
086: }
087:
088: public ClassSeeker(Class targetClass, Object identifier) {
089: this (targetClass);
090: identifiers = new LinkedList();
091: identifiers.add(identifier);
092: }
093:
094: public ClassSeeker(Class targetClass, Collection identifiers) {
095: this (targetClass);
096: this .identifiers = identifiers;
097: }
098:
099: public boolean execute(Object o) {
100: if (targetClass.isInstance(o)) {
101: if (identifiers == null)
102: return true;
103:
104: for (Iterator i = identifiers.iterator(); i.hasNext();) {
105: if (isMatch(i.next(), o))
106: return true;
107: }
108: }
109:
110: return false;
111: }
112:
113: protected abstract boolean isMatch(Object identifier,
114: Object planObject);
115: }
116:
117: // used to get all queries, a set of queries or a single, specified query
118: protected static class QuerySeeker extends ClassSeeker {
119: public QuerySeeker() {
120: super (QueryResultAdapter.class);
121: }
122:
123: public QuerySeeker(String queryId) {
124: super (QueryResultAdapter.class, queryId);
125: }
126:
127: public QuerySeeker(Collection queryIds) {
128: super (QueryResultAdapter.class, queryIds);
129: }
130:
131: protected boolean isMatch(Object identifier, Object planObject) {
132: String queryId = (String) identifier;
133: QueryResultAdapter qra = (QueryResultAdapter) planObject;
134: return qra.checkID(queryId);
135: }
136: }
137:
138: // used to uniquely identify an alert
139: protected static class AlertIdentifier {
140: public String queryId = null;
141: public String alertName = null;
142:
143: public AlertIdentifier(String queryId, String alertName) {
144: this .queryId = queryId;
145: this .alertName = alertName;
146: }
147:
148: public boolean equals(Object o) {
149: if (o instanceof AlertIdentifier) {
150: AlertIdentifier ai = (AlertIdentifier) o;
151: return queryId.equals(ai.queryId)
152: && alertName.equals(ai.alertName);
153: }
154: return false;
155: }
156: }
157:
158: // look for alerts on the logplan with this UnaryPredicate class.
159: protected static class AlertSeeker extends ClassSeeker {
160: public AlertSeeker() {
161: super (Alert.class);
162: }
163:
164: public AlertSeeker(AlertIdentifier ai) {
165: super (Alert.class, ai);
166: }
167:
168: public AlertSeeker(Collection c) {
169: super (Alert.class, c);
170: }
171:
172: protected boolean isMatch(Object identifier, Object planObject) {
173: AlertIdentifier ai = (AlertIdentifier) identifier;
174: Alert alert = (Alert) planObject;
175: return (alert.getQueryAdapter().checkID(ai.queryId) && alert
176: .getName().equals(ai.alertName));
177: }
178: }
179:
180: protected void waitForAndReturnResults(String queryId,
181: PrintWriter out, boolean xml) {
182: waitForAndReturnResults(queryId, out, xml, timeoutDefault);
183: }
184:
185: protected void waitForAndReturnResults(String queryId,
186: PrintWriter out, boolean xml, final long timeout) {
187: class ChangeListener implements SubscriptionListener {
188: public QueryResultAdapter changedQra = null;
189:
190: public synchronized void waitForChange() {
191: try {
192: this .wait(timeout);
193: } catch (InterruptedException bla) {
194: }
195: }
196:
197: public synchronized void subscriptionChanged(Subscription s) {
198: Enumeration changedList = ((IncrementalSubscription) s)
199: .getChangedList();
200: if (changedList.hasMoreElements()) {
201: changedQra = (QueryResultAdapter) changedList
202: .nextElement();
203: if (changedQra.allClustersResponded()) {
204: //System.out.println("All clusters checked in");
205: this .notify();
206: } else {
207: //System.out.println("Still waiting for some clusters");
208: }
209: }
210: }
211:
212: }
213:
214: ChangeListener cl = new ChangeListener();
215: UnaryPredicate queryMonitor = new QuerySeeker(queryId);
216: Subscription s = null;
217: try {
218: blackboard.openTransaction();
219: s = blackboard.subscribe(queryMonitor);
220: subscriptionMonitorSupport.setSubscriptionListener(s, cl);
221: } finally {
222: synchronized (cl) {
223: // synchronized (cl) is necessary in the rare case that the agg is done
224: // before I get to the waitForChange()
225: blackboard.closeTransaction();
226: // wait for a publish change event on the query result adapter
227: // and then get and return result set
228: cl.waitForChange();
229: }
230: }
231:
232: // It's possible that we never heard any updates on the QRA. If so, retrieve it
233: // from the Blackboard now.
234: if (cl.changedQra == null) {
235: cl.changedQra = (QueryResultAdapter) ((IncrementalSubscription) s)
236: .first();
237: }
238:
239: // Set any unresponding agents as exceptions on the result set
240: Set responded = cl.changedQra.getRawResultSet()
241: .getRespondingClusters();
242: Enumeration en = cl.changedQra.getQuery().getSourceClusters();
243: AggregationResultSet results = cl.changedQra.getResultSet();
244: while (en.hasMoreElements()) {
245: String clusterID = (String) en.nextElement();
246: if (!responded.contains(clusterID))
247: results
248: .setException(
249: clusterID,
250: "Agent "
251: + clusterID
252: + " did not respond to query before timeout occurred");
253: }
254:
255: unsubscribe(s);
256: if (xml) {
257: out.println(results.toXml());
258: } else {
259: printQueryReportPage(cl.changedQra, out);
260: }
261: removeQuery(cl.changedQra);
262: }
263:
264: // Remove the query from the logplan as well as any Alerts that depend on it
265: protected void removeQuery(QueryResultAdapter q) {
266: try {
267: blackboard.openTransaction();
268: for (Iterator i = q.getAlerts(); i.hasNext();)
269: blackboard.publishRemove(i.next());
270: blackboard.publishRemove(q);
271: } catch (Exception e) {
272: e.printStackTrace();
273: } finally {
274: blackboard.closeTransaction();
275: }
276: }
277:
278: // Update the query on the blackboard using the given Agg Query
279: protected void updateQuery(QueryResultAdapter q,
280: AggregationQuery newQuery) {
281: q.updateClusters(newQuery.getSourceClustersVector());
282: try {
283: blackboard.openTransaction();
284: blackboard.publishChange(q);
285: } catch (Exception e) {
286: e.printStackTrace();
287: } finally {
288: blackboard.closeTransaction();
289: }
290: }
291:
292: /**
293: * Get QueryResultAdapter for given query id;
294: * return null if query is not found.
295: */
296: protected QueryResultAdapter findQuery(String queryId) {
297: Iterator qras = query(new QuerySeeker(queryId)).iterator();
298: return qras.hasNext() ? (QueryResultAdapter) qras.next() : null;
299: }
300:
301: // used to be "removeQuery" --
302: // find the query matching the specified ID and remove it from the logplan
303: protected void findAndRemoveQuery(String queryId) {
304: // find query adapter on log plan
305: blackboard.openTransaction();
306: Collection qs = blackboard.query(new QuerySeeker(queryId));
307: blackboard.closeTransaction();
308: Iterator qi = qs.iterator();
309:
310: // remove all queries matching query id
311: while (qi.hasNext())
312: removeQuery((QueryResultAdapter) qi.next());
313: }
314:
315: // find the query matching the specified ID and remove it from the logplan
316: protected void findAndUpdateQuery(String queryId,
317: AggregationQuery aq) {
318: // find query adapter on log plan
319: blackboard.openTransaction();
320: Collection qs = blackboard.query(new QuerySeeker(queryId));
321: blackboard.closeTransaction();
322: Iterator qi = qs.iterator();
323:
324: // remove all queries matching query id
325: while (qi.hasNext()) {
326: updateQuery((QueryResultAdapter) qi.next(), aq);
327: }
328: }
329:
330: protected void printQueryReportPage(QueryResultAdapter qra,
331: PrintWriter out) {
332: out.println("<CENTER>");
333: out.println(HTMLPresenter.toHTML(qra));
334: out.println("</CENTER>");
335: }
336:
337: // for access to the blackboard
338:
339: protected Collection query(UnaryPredicate predicate) {
340: Collection ret;
341: blackboard.openTransaction();
342: ret = blackboard.query(predicate);
343: blackboard.closeTransaction();
344: return ret;
345:
346: }
347:
348: protected void publishAdd(Object o) {
349: try {
350: blackboard.openTransaction();
351: blackboard.publishAdd(o);
352: } finally {
353: blackboard.closeTransactionDontReset();
354: }
355: }
356:
357: protected void publishChange(Object o) {
358: try {
359: blackboard.openTransaction();
360: blackboard.publishChange(o);
361: } finally {
362: blackboard.closeTransactionDontReset();
363: }
364: }
365:
366: protected void publishRemove(Object o) {
367: try {
368: blackboard.openTransaction();
369: blackboard.publishRemove(o);
370: } finally {
371: blackboard.closeTransactionDontReset();
372: }
373: }
374:
375: private void unsubscribe(Subscription subscription) {
376: try {
377: blackboard.openTransaction();
378: blackboard.unsubscribe(subscription);
379: } finally {
380: blackboard.closeTransactionDontReset();
381: }
382: }
383: }
|