Source Code Cross Referenced for Monitor.java in  » Science » Cougaar12_4 » org » cougaar » lib » aggagent » client » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.lib.aggagent.client 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * <copyright>
003:         *  
004:         *  Copyright 2003-2004 BBNT Solutions, LLC
005:         *  under sponsorship of the Defense Advanced Research Projects
006:         *  Agency (DARPA).
007:         * 
008:         *  You can redistribute this software and/or modify it under the
009:         *  terms of the Cougaar Open Source License as published on the
010:         *  Cougaar Open Source Website (www.cougaar.org).
011:         * 
012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023:         *  
024:         * </copyright>
025:         */
026:        package org.cougaar.lib.aggagent.client;
027:
028:        import java.io.InputStream;
029:        import java.io.PrintStream;
030:        import java.net.HttpURLConnection;
031:        import java.net.URL;
032:        import java.net.URLConnection;
033:        import java.util.Collection;
034:        import java.util.HashMap;
035:        import java.util.Iterator;
036:        import java.util.TimerTask;
037:
038:        import org.cougaar.lib.aggagent.query.UpdateListener;
039:        import org.cougaar.lib.aggagent.query.UpdateObservable;
040:        import org.cougaar.lib.aggagent.util.Const;
041:        import org.cougaar.lib.aggagent.util.XmlUtils;
042:        import org.w3c.dom.Element;
043:        import org.w3c.dom.Node;
044:        import org.w3c.dom.NodeList;
045:
046:        /**
047:         * Abstract base class for result set and alert monitors.  Provides support
048:         * for both periodic pull monitoring as well as keep alive server push
049:         * monitoring.
050:         *
051:         * Maintains a collection of monitored objects and keeps them updated based
052:         * on changes on the aggregation agent's blackboard.  To react to these
053:         * changes either:
054:         * <UL>
055:         * <LI>add update listener(s) to the monitor class and receive events for
056:         *     changes to all monitored objects or</LI>
057:         * <LI>add update listener(s) to 'live' objects returned by monitor and
058:         *     receive events only for those objects</LI>
059:         * </UL>
060:         */
061:        abstract class Monitor {
062:            /**
063:             * PULL_METHOD is an update method in which the client periodically pulls
064:             * incremental updates from passive session on aggregation agent.  A new
065:             * connection is created with each pull.
066:             */
067:            public static final int PULL_METHOD = 0;
068:
069:            /**
070:             * KEEP_ALIVE_METHOD is an update method in which the client creates a keep
071:             * alive session with aggregation agent.  Incremental updates are pushed to
072:             * the client over this pipe.
073:             */
074:            public static final int KEEP_ALIVE_METHOD = 1;
075:
076:            private Object lock = new Object();
077:            private boolean notifyKeepAliveExit = false;
078:            private int updateMethod;
079:            private boolean monitorAllObjects = false;
080:            private HashMap monitoredObjectMap = new HashMap();
081:            private String serverURL = null;
082:            private String monitorTag = null;
083:            private UpdateObservable updateObservable = new UpdateObservable();
084:
085:            private String passiveSessionKey = null;
086:            private TimerTask pullTask = new TimerTask() {
087:                public synchronized void run() {
088:                    if (passiveSessionKey != null) {
089:                        String updateURL = serverURL
090:                                + "&REQUEST_UPDATE=1&SESSION_ID="
091:                                + passiveSessionKey;
092:                        Element root = XmlUtils.requestXML(updateURL, null);
093:                        updateMonitoredObjects(root);
094:                    }
095:                }
096:            };
097:
098:            private volatile Thread keepAliveThread = null;
099:            private Runnable keepAliveTask = new Runnable() {
100:                public void run() {
101:                    boolean sessionIdTag = true;
102:                    String sessionId = null;
103:                    InputStream i = null;
104:                    String monitorRequest = createMonitorRequest();
105:                    if (monitorRequest == null)
106:                        return;
107:
108:                    // set up keep alive connection
109:                    try {
110:                        URL url = new URL(serverURL + "&KEEP_ALIVE_MONITOR=1");
111:                        URLConnection conn = url.openConnection();
112:                        ((HttpURLConnection) conn).setRequestMethod("PUT");
113:                        conn.setDoOutput(true);
114:                        conn.setDoInput(true);
115:
116:                        // send request
117:                        PrintStream servicePrint = new PrintStream(conn
118:                                .getOutputStream());
119:                        servicePrint.println(monitorRequest);
120:
121:                        // get updates
122:                        i = conn.getInputStream();
123:                        Thread this Thread = Thread.currentThread();
124:                        int formFeed = (int) '\f';
125:                        while (keepAliveThread == this Thread) {
126:                            StringBuffer updateMessage = new StringBuffer();
127:                            int c;
128:                            while (((c = i.read()) != formFeed) && (c != -1)
129:                                    && (keepAliveThread == this Thread)) {
130:                                updateMessage.append((char) c);
131:                            }
132:
133:                            if (c == -1) {
134:                                break;
135:                            }
136:
137:                            if (c == formFeed) {
138:                                if (sessionIdTag) {
139:                                    // session id is sent first
140:                                    sessionIdTag = false;
141:                                    Element root = XmlUtils.parse(updateMessage
142:                                            .toString());
143:                                    sessionId = root.getAttribute("id");
144:                                } else if (!updateMessage.toString().equals(
145:                                        Const.KEEP_ALIVE_ACK_MESSAGE)) {
146:                                    Element root = XmlUtils.parse(updateMessage
147:                                            .toString());
148:                                    updateMonitoredObjects(root);
149:                                }
150:                            }
151:                        }
152:                    } catch (Exception e) {
153:                        e.printStackTrace();
154:                        System.out.println("Error reading from keep alive.\n"
155:                                + "Exiting monitor with request:\n"
156:                                + monitorRequest);
157:                    } finally {
158:                        // send message to servlet to cancel keep alive
159:                        if (sessionId != null) {
160:                            String cancelSessionURL = serverURL
161:                                    + "&CANCEL_SESSION_ID=" + sessionId;
162:                            XmlUtils.requestString(cancelSessionURL, null);
163:                        }
164:
165:                        // ensure that nothing is left unread
166:                        try {
167:                            while (i.read() != -1) {
168:                            }
169:                        } catch (Exception e) {
170:                            e.printStackTrace();
171:                        }
172:
173:                        // notify canceler that deed has been done
174:                        synchronized (lock) {
175:                            if (notifyKeepAliveExit) {
176:                                notifyKeepAliveExit = false;
177:                                lock.notify();
178:                            }
179:                        }
180:                    }
181:                }
182:            };
183:
184:            /**
185:             * Create a new monitor to monitor a set of objects on the aggregation
186:             * agent.  Each monitor is used to monitor a single type of object
187:             * (e.g. AlertMonitor, ResultSetMonitor).
188:             *
189:             * @param serverURL    aggregation agent cluster's text URL
190:             * @param monitorTag   magic text string used to tell aggregation PSP what
191:             *                     type of objects are being monitored.
192:             *                     (e.g. "alert", "result_set")
193:             * @param updateMethod method used to keep monitored objects updated
194:             *                     PULL_METHOD - periodically pull incremental updates
195:             *                                   from passive session on aggregation
196:             *                                   agent.  Create new connection with
197:             *                                   each pull.
198:             *                      KEEP_ALIVE_METHOD - create keep alive session
199:             *                                   with aggregation agent.  Incremental
200:             *                                   updates are pushed to the client
201:             *                                   over this pipe.
202:             */
203:            public Monitor(String serverURL, String monitorTag, int updateMethod) {
204:                this .serverURL = serverURL;
205:                this .monitorTag = monitorTag;
206:                this .updateMethod = updateMethod;
207:            }
208:
209:            /**
210:             * Change mode to monitor all objects on the aggregation agent that are of
211:             * the type that this monitor handles.  Without calling this method, only
212:             * a defined set of objects are monitored (see monitorObject method).
213:             */
214:            public void monitorAllObjects() {
215:                monitorAllObjects = true;
216:                cancelUpdateSession();
217:                createUpdateSession();
218:            }
219:
220:            /**
221:             * Add an update listener to observe all monitored objects.  This is
222:             * roughly equivalent to adding an update listener to each of the
223:             * currently monitored objects.  But, when monitor all objects is turned
224:             * on, this can also be used to discover newly added objects on the
225:             * aggregation agent's blackboard (via objectAdded listener call).
226:             *
227:             * @param ul update listener to add to entire monitor.
228:             */
229:            public void addUpdateListener(UpdateListener ul) {
230:                updateObservable.addUpdateListener(ul);
231:            }
232:
233:            /**
234:             * Remove an update listener such that it no longer gets notified of
235:             * changes to monitored objects.
236:             *
237:             * @param ul update listener to remove from monitor.
238:             */
239:            public void removeUpdateListener(UpdateListener ul) {
240:                updateObservable.removeUpdateListener(ul);
241:            }
242:
243:            /**
244:             * Returns a collection of all 'live' objects currently being updated by
245:             * this monitor.
246:             *
247:             * @return a collection of all 'live' objects currently being updated by
248:             *         this monitor.
249:             */
250:            public Collection getMonitoredObjects() {
251:                return monitoredObjectMap.values();
252:            }
253:
254:            /**
255:             * Returns true if an object matching the given identifier is currently
256:             * being updated by this monitor.
257:             *
258:             * @param identifier   an object that uniquely identifies an object on the
259:             *                     aggregation agent.  Must be able to use this object
260:             *                     as a hashtable key (i.e. must have proper equals()
261:             *                     and hashcode() methods).
262:             *
263:             * @return true if an object matching the given identifier is currently
264:             *         being updated by this monitor.
265:             */
266:            public boolean isMonitoring(Object identifier) {
267:                return monitoredObjectMap.containsKey(identifier);
268:            }
269:
270:            /**
271:             * Get the timer task to use to periodically pull incremental updates from
272:             * the aggregation agent.
273:             *
274:             * @return the timer task to use to periodically pull incremental updates
275:             * from the aggregation agent.  Returns null if monitor is not configured to
276:             * use the pull update method.
277:             */
278:            public TimerTask getPullTask() {
279:                return (updateMethod == PULL_METHOD) ? pullTask : null;
280:            }
281:
282:            /**
283:             * Cancel this monitor and any overhead associated with it.
284:             *
285:             * @return true, if successful.
286:             */
287:            public boolean cancel() {
288:                if (updateMethod == PULL_METHOD) {
289:                    boolean r = pullTask.cancel();
290:                    cancelPassiveSession();
291:                    return r;
292:                } else if ((updateMethod == KEEP_ALIVE_METHOD)
293:                        && (keepAliveThread != null)) {
294:                    synchronized (lock) {
295:                        notifyKeepAliveExit = true;
296:                        keepAliveThread = null; // will end keep alive thread
297:
298:                        // wait for thread to end
299:                        try {
300:                            lock.wait(10000);
301:                        } catch (InterruptedException e) {
302:                        }
303:                    }
304:
305:                    return true;
306:                }
307:                return false;
308:            }
309:
310:            /**
311:             * Must be defined by subclasses to provide a xml representation of a
312:             * given identifier.
313:             *
314:             * @param identifier   an object that uniquely identifies an object on the
315:             *                     aggregation agent.  Must be able to use this object
316:             *                     as a hashtable key (i.e. must have proper equals()
317:             *                     and hashcode() methods).
318:             *
319:             * @return a xml representation of given identifier.
320:             */
321:            protected abstract String createIdTag(Object identifier);
322:
323:            /**
324:             * Must be defined by subclasses to define what should be done when an
325:             * update event (either add or change) is reported by the aggregation agent
326:             * to a object described by the given xml element tree.
327:             *
328:             * @param monitoredElement xml element tree that describes the updated
329:             *                         monitored object.
330:             *
331:             * @return a live object updated based on the given xml
332:             */
333:            protected abstract Object update(Element monitoredElement);
334:
335:            /**
336:             * Must be defined by subclasses to define what should be done when a
337:             * remove event is reported by the aggregation agent to a object described
338:             * by the given xml element tree.
339:             *
340:             * @param monitoredElement xml element tree that describes the removed
341:             *                         monitored object.
342:             *
343:             * @return previously live object that was removed.
344:             */
345:            protected abstract Object remove(Element monitoredElement);
346:
347:            /**
348:             * Monitor a new object.  If object matching identifier is already being
349:             * monitored, existing live object is returned.  Otherwise, passed in
350:             * object becomes live.
351:             *
352:             * @param identifier   an object that uniquely identifies an object on the
353:             *                     aggregation agent.  Must be able to use this object
354:             *                     as a hashtable key (i.e. must have proper equals()
355:             *                     and hashcode() methods).
356:             * @param monitoredObj a valid object for this type of monitor.
357:             *
358:             * @return a live object that is actively being updated to match a subject
359:             *         object on the aggregation agent.
360:             */
361:            protected Object monitorObject(Object identifier,
362:                    Object monitoredObj) {
363:                Object existingMonitoredObj = getMonitoredObject(identifier);
364:                if (existingMonitoredObj == null) {
365:                    if (!monitorAllObjects)
366:                        cancelUpdateSession();
367:                    monitoredObjectMap.put(identifier, monitoredObj);
368:                    existingMonitoredObj = monitoredObj;
369:                    if (!monitorAllObjects)
370:                        createUpdateSession();
371:                }
372:                return existingMonitoredObj;
373:            }
374:
375:            /**
376:             * Remove this object from the set of objects being monitored.  This
377:             * method has a negligible effect if monitor-all is turned on
378:             * (old live object will die, but new one will take it's place if that
379:             *  object is still on the log plan).
380:             *
381:             * @param identifier   an object that uniquely identifies an object on the
382:             *                     aggregation agent.  Must be able to use this object
383:             *                     as a hashtable key (i.e. must have proper equals()
384:             *                     and hashcode() methods).
385:             *
386:             * @return previously live object that was removed.
387:             */
388:            protected Object stopMonitoringObject(Object identifier) {
389:                if (!monitorAllObjects)
390:                    cancelUpdateSession();
391:                Object removedObject = monitoredObjectMap.remove(identifier);
392:                if (!monitorAllObjects)
393:                    createUpdateSession();
394:                return removedObject;
395:            }
396:
397:            /**
398:             * Get a specific object being updated by this monitor.
399:             *
400:             * @param identifier   an object that uniquely identifies an object on the
401:             *                     aggregation agent.  Must be able to use this object
402:             *                     as a hashtable key (i.e. must have proper equals()
403:             *                     and hashcode() methods).
404:             *
405:             * @return a live object that is actively being updated to match a subject
406:             *         object on the aggregation agent.
407:             */
408:            protected Object getMonitoredObject(Object identifier) {
409:                return monitoredObjectMap.get(identifier);
410:            }
411:
412:            private void cancelUpdateSession() {
413:                if (updateMethod == PULL_METHOD) {
414:                    cancelPassiveSession();
415:                } else {
416:                    keepAliveThread = null; // flag thread to exit
417:                }
418:            }
419:
420:            private void createUpdateSession() {
421:                if (updateMethod == PULL_METHOD) {
422:                    requestPassiveSession();
423:                    pullTask.run();
424:                } else if (updateMethod == KEEP_ALIVE_METHOD) {
425:                    keepAliveThread = new Thread(keepAliveTask);
426:                    keepAliveThread.start();
427:                }
428:            }
429:
430:            private String createMonitorRequest() {
431:                String monitorRequest = null;
432:                if (!monitoredObjectMap.isEmpty() || monitorAllObjects) {
433:                    StringBuffer s = new StringBuffer(
434:                            "<monitor_session type=\"");
435:                    s.append(monitorTag);
436:                    s.append("\">\n");
437:                    if (monitorAllObjects) {
438:                        s.append("<monitor_all />\n");
439:                    } else {
440:                        for (Iterator i = monitoredObjectMap.keySet()
441:                                .iterator(); i.hasNext();) {
442:                            s.append(createIdTag(i.next()));
443:                        }
444:                    }
445:                    s.append("</monitor_session>");
446:                    monitorRequest = s.toString();
447:                }
448:                return monitorRequest;
449:            }
450:
451:            private String requestPassiveSession() {
452:                String passiveSessionRequest = createMonitorRequest();
453:                if (passiveSessionRequest == null)
454:                    return null;
455:
456:                // Request a passive session on Aggregation Agent
457:                String loadedURL = serverURL + "&CREATE_PASSIVE_SESSION=1";
458:                passiveSessionKey = XmlUtils.requestString(loadedURL,
459:                        passiveSessionRequest);
460:                return passiveSessionKey;
461:            }
462:
463:            private String cancelPassiveSession() {
464:                String response = null;
465:                if (passiveSessionKey != null) {
466:                    // Cancel passive session on Aggregation Agent
467:                    String loadedURL = serverURL
468:                            + "&CANCEL_PASSIVE_SESSION=1&SESSION_ID="
469:                            + passiveSessionKey;
470:                    response = XmlUtils.requestString(loadedURL, null);
471:                    passiveSessionKey = null;
472:                }
473:                return response;
474:            }
475:
476:            private void updateMonitoredObjects(Element incrementalUpdate) {
477:                // update result set based on incremental change xml
478:                NodeList nl = incrementalUpdate.getChildNodes();
479:                for (int i = 0; i < nl.getLength(); i++) {
480:                    Node n = nl.item(i);
481:                    if (n.getNodeType() == Node.ELEMENT_NODE) {
482:                        Element child = (Element) n;
483:                        String s = child.getNodeName();
484:                        if (s.equals("added")) {
485:                            addAll(child);
486:                        }
487:                        if (s.equals("changed")) {
488:                            changeAll(child);
489:                        } else if (s.equals("removed")) {
490:                            removeAll(child);
491:                        }
492:                    }
493:                }
494:            }
495:
496:            private void addAll(Element monitoredObjectsParent) {
497:                NodeList nl = monitoredObjectsParent
498:                        .getElementsByTagName(monitorTag);
499:                for (int i = 0; i < nl.getLength(); i++) {
500:                    Object updatedObject = update((Element) nl.item(i));
501:                    updateObservable.fireObjectAdded(updatedObject);
502:                }
503:            }
504:
505:            private void changeAll(Element monitoredObjectsParent) {
506:                NodeList nl = monitoredObjectsParent
507:                        .getElementsByTagName(monitorTag);
508:                for (int i = 0; i < nl.getLength(); i++) {
509:                    Object updatedObject = update((Element) nl.item(i));
510:                    updateObservable.fireObjectChanged(updatedObject);
511:                }
512:            }
513:
514:            private void removeAll(Element monitoredObjectsParent) {
515:                NodeList nl = monitoredObjectsParent
516:                        .getElementsByTagName(monitorTag);
517:                for (int i = 0; i < nl.getLength(); i++) {
518:                    Object removedObject = remove((Element) nl.item(i));
519:                    updateObservable.fireObjectRemoved(removedObject);
520:                }
521:            }
522:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.