001: /*
002: * <copyright>
003: *
004: * Copyright 2003-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.lib.aggagent.client;
027:
028: import java.io.InputStream;
029: import java.io.PrintStream;
030: import java.net.HttpURLConnection;
031: import java.net.URL;
032: import java.net.URLConnection;
033: import java.util.Collection;
034: import java.util.HashMap;
035: import java.util.Iterator;
036: import java.util.TimerTask;
037:
038: import org.cougaar.lib.aggagent.query.UpdateListener;
039: import org.cougaar.lib.aggagent.query.UpdateObservable;
040: import org.cougaar.lib.aggagent.util.Const;
041: import org.cougaar.lib.aggagent.util.XmlUtils;
042: import org.w3c.dom.Element;
043: import org.w3c.dom.Node;
044: import org.w3c.dom.NodeList;
045:
046: /**
047: * Abstract base class for result set and alert monitors. Provides support
048: * for both periodic pull monitoring as well as keep alive server push
049: * monitoring.
050: *
051: * Maintains a collection of monitored objects and keeps them updated based
052: * on changes on the aggregation agent's blackboard. To react to these
053: * changes either:
054: * <UL>
055: * <LI>add update listener(s) to the monitor class and receive events for
056: * changes to all monitored objects or</LI>
057: * <LI>add update listener(s) to 'live' objects returned by monitor and
058: * receive events only for those objects</LI>
059: * </UL>
060: */
061: abstract class Monitor {
062: /**
063: * PULL_METHOD is an update method in which the client periodically pulls
064: * incremental updates from passive session on aggregation agent. A new
065: * connection is created with each pull.
066: */
067: public static final int PULL_METHOD = 0;
068:
069: /**
070: * KEEP_ALIVE_METHOD is an update method in which the client creates a keep
071: * alive session with aggregation agent. Incremental updates are pushed to
072: * the client over this pipe.
073: */
074: public static final int KEEP_ALIVE_METHOD = 1;
075:
076: private Object lock = new Object();
077: private boolean notifyKeepAliveExit = false;
078: private int updateMethod;
079: private boolean monitorAllObjects = false;
080: private HashMap monitoredObjectMap = new HashMap();
081: private String serverURL = null;
082: private String monitorTag = null;
083: private UpdateObservable updateObservable = new UpdateObservable();
084:
085: private String passiveSessionKey = null;
086: private TimerTask pullTask = new TimerTask() {
087: public synchronized void run() {
088: if (passiveSessionKey != null) {
089: String updateURL = serverURL
090: + "&REQUEST_UPDATE=1&SESSION_ID="
091: + passiveSessionKey;
092: Element root = XmlUtils.requestXML(updateURL, null);
093: updateMonitoredObjects(root);
094: }
095: }
096: };
097:
098: private volatile Thread keepAliveThread = null;
099: private Runnable keepAliveTask = new Runnable() {
100: public void run() {
101: boolean sessionIdTag = true;
102: String sessionId = null;
103: InputStream i = null;
104: String monitorRequest = createMonitorRequest();
105: if (monitorRequest == null)
106: return;
107:
108: // set up keep alive connection
109: try {
110: URL url = new URL(serverURL + "&KEEP_ALIVE_MONITOR=1");
111: URLConnection conn = url.openConnection();
112: ((HttpURLConnection) conn).setRequestMethod("PUT");
113: conn.setDoOutput(true);
114: conn.setDoInput(true);
115:
116: // send request
117: PrintStream servicePrint = new PrintStream(conn
118: .getOutputStream());
119: servicePrint.println(monitorRequest);
120:
121: // get updates
122: i = conn.getInputStream();
123: Thread this Thread = Thread.currentThread();
124: int formFeed = (int) '\f';
125: while (keepAliveThread == this Thread) {
126: StringBuffer updateMessage = new StringBuffer();
127: int c;
128: while (((c = i.read()) != formFeed) && (c != -1)
129: && (keepAliveThread == this Thread)) {
130: updateMessage.append((char) c);
131: }
132:
133: if (c == -1) {
134: break;
135: }
136:
137: if (c == formFeed) {
138: if (sessionIdTag) {
139: // session id is sent first
140: sessionIdTag = false;
141: Element root = XmlUtils.parse(updateMessage
142: .toString());
143: sessionId = root.getAttribute("id");
144: } else if (!updateMessage.toString().equals(
145: Const.KEEP_ALIVE_ACK_MESSAGE)) {
146: Element root = XmlUtils.parse(updateMessage
147: .toString());
148: updateMonitoredObjects(root);
149: }
150: }
151: }
152: } catch (Exception e) {
153: e.printStackTrace();
154: System.out.println("Error reading from keep alive.\n"
155: + "Exiting monitor with request:\n"
156: + monitorRequest);
157: } finally {
158: // send message to servlet to cancel keep alive
159: if (sessionId != null) {
160: String cancelSessionURL = serverURL
161: + "&CANCEL_SESSION_ID=" + sessionId;
162: XmlUtils.requestString(cancelSessionURL, null);
163: }
164:
165: // ensure that nothing is left unread
166: try {
167: while (i.read() != -1) {
168: }
169: } catch (Exception e) {
170: e.printStackTrace();
171: }
172:
173: // notify canceler that deed has been done
174: synchronized (lock) {
175: if (notifyKeepAliveExit) {
176: notifyKeepAliveExit = false;
177: lock.notify();
178: }
179: }
180: }
181: }
182: };
183:
184: /**
185: * Create a new monitor to monitor a set of objects on the aggregation
186: * agent. Each monitor is used to monitor a single type of object
187: * (e.g. AlertMonitor, ResultSetMonitor).
188: *
189: * @param serverURL aggregation agent cluster's text URL
190: * @param monitorTag magic text string used to tell aggregation PSP what
191: * type of objects are being monitored.
192: * (e.g. "alert", "result_set")
193: * @param updateMethod method used to keep monitored objects updated
194: * PULL_METHOD - periodically pull incremental updates
195: * from passive session on aggregation
196: * agent. Create new connection with
197: * each pull.
198: * KEEP_ALIVE_METHOD - create keep alive session
199: * with aggregation agent. Incremental
200: * updates are pushed to the client
201: * over this pipe.
202: */
203: public Monitor(String serverURL, String monitorTag, int updateMethod) {
204: this .serverURL = serverURL;
205: this .monitorTag = monitorTag;
206: this .updateMethod = updateMethod;
207: }
208:
209: /**
210: * Change mode to monitor all objects on the aggregation agent that are of
211: * the type that this monitor handles. Without calling this method, only
212: * a defined set of objects are monitored (see monitorObject method).
213: */
214: public void monitorAllObjects() {
215: monitorAllObjects = true;
216: cancelUpdateSession();
217: createUpdateSession();
218: }
219:
220: /**
221: * Add an update listener to observe all monitored objects. This is
222: * roughly equivalent to adding an update listener to each of the
223: * currently monitored objects. But, when monitor all objects is turned
224: * on, this can also be used to discover newly added objects on the
225: * aggregation agent's blackboard (via objectAdded listener call).
226: *
227: * @param ul update listener to add to entire monitor.
228: */
229: public void addUpdateListener(UpdateListener ul) {
230: updateObservable.addUpdateListener(ul);
231: }
232:
233: /**
234: * Remove an update listener such that it no longer gets notified of
235: * changes to monitored objects.
236: *
237: * @param ul update listener to remove from monitor.
238: */
239: public void removeUpdateListener(UpdateListener ul) {
240: updateObservable.removeUpdateListener(ul);
241: }
242:
243: /**
244: * Returns a collection of all 'live' objects currently being updated by
245: * this monitor.
246: *
247: * @return a collection of all 'live' objects currently being updated by
248: * this monitor.
249: */
250: public Collection getMonitoredObjects() {
251: return monitoredObjectMap.values();
252: }
253:
254: /**
255: * Returns true if an object matching the given identifier is currently
256: * being updated by this monitor.
257: *
258: * @param identifier an object that uniquely identifies an object on the
259: * aggregation agent. Must be able to use this object
260: * as a hashtable key (i.e. must have proper equals()
261: * and hashcode() methods).
262: *
263: * @return true if an object matching the given identifier is currently
264: * being updated by this monitor.
265: */
266: public boolean isMonitoring(Object identifier) {
267: return monitoredObjectMap.containsKey(identifier);
268: }
269:
270: /**
271: * Get the timer task to use to periodically pull incremental updates from
272: * the aggregation agent.
273: *
274: * @return the timer task to use to periodically pull incremental updates
275: * from the aggregation agent. Returns null if monitor is not configured to
276: * use the pull update method.
277: */
278: public TimerTask getPullTask() {
279: return (updateMethod == PULL_METHOD) ? pullTask : null;
280: }
281:
282: /**
283: * Cancel this monitor and any overhead associated with it.
284: *
285: * @return true, if successful.
286: */
287: public boolean cancel() {
288: if (updateMethod == PULL_METHOD) {
289: boolean r = pullTask.cancel();
290: cancelPassiveSession();
291: return r;
292: } else if ((updateMethod == KEEP_ALIVE_METHOD)
293: && (keepAliveThread != null)) {
294: synchronized (lock) {
295: notifyKeepAliveExit = true;
296: keepAliveThread = null; // will end keep alive thread
297:
298: // wait for thread to end
299: try {
300: lock.wait(10000);
301: } catch (InterruptedException e) {
302: }
303: }
304:
305: return true;
306: }
307: return false;
308: }
309:
310: /**
311: * Must be defined by subclasses to provide a xml representation of a
312: * given identifier.
313: *
314: * @param identifier an object that uniquely identifies an object on the
315: * aggregation agent. Must be able to use this object
316: * as a hashtable key (i.e. must have proper equals()
317: * and hashcode() methods).
318: *
319: * @return a xml representation of given identifier.
320: */
321: protected abstract String createIdTag(Object identifier);
322:
323: /**
324: * Must be defined by subclasses to define what should be done when an
325: * update event (either add or change) is reported by the aggregation agent
326: * to a object described by the given xml element tree.
327: *
328: * @param monitoredElement xml element tree that describes the updated
329: * monitored object.
330: *
331: * @return a live object updated based on the given xml
332: */
333: protected abstract Object update(Element monitoredElement);
334:
335: /**
336: * Must be defined by subclasses to define what should be done when a
337: * remove event is reported by the aggregation agent to a object described
338: * by the given xml element tree.
339: *
340: * @param monitoredElement xml element tree that describes the removed
341: * monitored object.
342: *
343: * @return previously live object that was removed.
344: */
345: protected abstract Object remove(Element monitoredElement);
346:
347: /**
348: * Monitor a new object. If object matching identifier is already being
349: * monitored, existing live object is returned. Otherwise, passed in
350: * object becomes live.
351: *
352: * @param identifier an object that uniquely identifies an object on the
353: * aggregation agent. Must be able to use this object
354: * as a hashtable key (i.e. must have proper equals()
355: * and hashcode() methods).
356: * @param monitoredObj a valid object for this type of monitor.
357: *
358: * @return a live object that is actively being updated to match a subject
359: * object on the aggregation agent.
360: */
361: protected Object monitorObject(Object identifier,
362: Object monitoredObj) {
363: Object existingMonitoredObj = getMonitoredObject(identifier);
364: if (existingMonitoredObj == null) {
365: if (!monitorAllObjects)
366: cancelUpdateSession();
367: monitoredObjectMap.put(identifier, monitoredObj);
368: existingMonitoredObj = monitoredObj;
369: if (!monitorAllObjects)
370: createUpdateSession();
371: }
372: return existingMonitoredObj;
373: }
374:
375: /**
376: * Remove this object from the set of objects being monitored. This
377: * method has a negligible effect if monitor-all is turned on
378: * (old live object will die, but new one will take it's place if that
379: * object is still on the log plan).
380: *
381: * @param identifier an object that uniquely identifies an object on the
382: * aggregation agent. Must be able to use this object
383: * as a hashtable key (i.e. must have proper equals()
384: * and hashcode() methods).
385: *
386: * @return previously live object that was removed.
387: */
388: protected Object stopMonitoringObject(Object identifier) {
389: if (!monitorAllObjects)
390: cancelUpdateSession();
391: Object removedObject = monitoredObjectMap.remove(identifier);
392: if (!monitorAllObjects)
393: createUpdateSession();
394: return removedObject;
395: }
396:
397: /**
398: * Get a specific object being updated by this monitor.
399: *
400: * @param identifier an object that uniquely identifies an object on the
401: * aggregation agent. Must be able to use this object
402: * as a hashtable key (i.e. must have proper equals()
403: * and hashcode() methods).
404: *
405: * @return a live object that is actively being updated to match a subject
406: * object on the aggregation agent.
407: */
408: protected Object getMonitoredObject(Object identifier) {
409: return monitoredObjectMap.get(identifier);
410: }
411:
412: private void cancelUpdateSession() {
413: if (updateMethod == PULL_METHOD) {
414: cancelPassiveSession();
415: } else {
416: keepAliveThread = null; // flag thread to exit
417: }
418: }
419:
420: private void createUpdateSession() {
421: if (updateMethod == PULL_METHOD) {
422: requestPassiveSession();
423: pullTask.run();
424: } else if (updateMethod == KEEP_ALIVE_METHOD) {
425: keepAliveThread = new Thread(keepAliveTask);
426: keepAliveThread.start();
427: }
428: }
429:
430: private String createMonitorRequest() {
431: String monitorRequest = null;
432: if (!monitoredObjectMap.isEmpty() || monitorAllObjects) {
433: StringBuffer s = new StringBuffer(
434: "<monitor_session type=\"");
435: s.append(monitorTag);
436: s.append("\">\n");
437: if (monitorAllObjects) {
438: s.append("<monitor_all />\n");
439: } else {
440: for (Iterator i = monitoredObjectMap.keySet()
441: .iterator(); i.hasNext();) {
442: s.append(createIdTag(i.next()));
443: }
444: }
445: s.append("</monitor_session>");
446: monitorRequest = s.toString();
447: }
448: return monitorRequest;
449: }
450:
451: private String requestPassiveSession() {
452: String passiveSessionRequest = createMonitorRequest();
453: if (passiveSessionRequest == null)
454: return null;
455:
456: // Request a passive session on Aggregation Agent
457: String loadedURL = serverURL + "&CREATE_PASSIVE_SESSION=1";
458: passiveSessionKey = XmlUtils.requestString(loadedURL,
459: passiveSessionRequest);
460: return passiveSessionKey;
461: }
462:
463: private String cancelPassiveSession() {
464: String response = null;
465: if (passiveSessionKey != null) {
466: // Cancel passive session on Aggregation Agent
467: String loadedURL = serverURL
468: + "&CANCEL_PASSIVE_SESSION=1&SESSION_ID="
469: + passiveSessionKey;
470: response = XmlUtils.requestString(loadedURL, null);
471: passiveSessionKey = null;
472: }
473: return response;
474: }
475:
476: private void updateMonitoredObjects(Element incrementalUpdate) {
477: // update result set based on incremental change xml
478: NodeList nl = incrementalUpdate.getChildNodes();
479: for (int i = 0; i < nl.getLength(); i++) {
480: Node n = nl.item(i);
481: if (n.getNodeType() == Node.ELEMENT_NODE) {
482: Element child = (Element) n;
483: String s = child.getNodeName();
484: if (s.equals("added")) {
485: addAll(child);
486: }
487: if (s.equals("changed")) {
488: changeAll(child);
489: } else if (s.equals("removed")) {
490: removeAll(child);
491: }
492: }
493: }
494: }
495:
496: private void addAll(Element monitoredObjectsParent) {
497: NodeList nl = monitoredObjectsParent
498: .getElementsByTagName(monitorTag);
499: for (int i = 0; i < nl.getLength(); i++) {
500: Object updatedObject = update((Element) nl.item(i));
501: updateObservable.fireObjectAdded(updatedObject);
502: }
503: }
504:
505: private void changeAll(Element monitoredObjectsParent) {
506: NodeList nl = monitoredObjectsParent
507: .getElementsByTagName(monitorTag);
508: for (int i = 0; i < nl.getLength(); i++) {
509: Object updatedObject = update((Element) nl.item(i));
510: updateObservable.fireObjectChanged(updatedObject);
511: }
512: }
513:
514: private void removeAll(Element monitoredObjectsParent) {
515: NodeList nl = monitoredObjectsParent
516: .getElementsByTagName(monitorTag);
517: for (int i = 0; i < nl.getLength(); i++) {
518: Object removedObject = remove((Element) nl.item(i));
519: updateObservable.fireObjectRemoved(removedObject);
520: }
521: }
522: }
|