001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.glm.servlet;
028:
029: import java.io.IOException;
030: import java.io.InputStream;
031: import java.io.OutputStream;
032: import java.util.Collection;
033: import java.util.Date;
034: import java.util.Enumeration;
035: import java.util.HashSet;
036: import java.util.Iterator;
037: import java.util.Set;
038: import java.util.StringTokenizer;
039:
040: import javax.servlet.ServletException;
041: import javax.servlet.http.HttpServletRequest;
042: import javax.servlet.http.HttpServletResponse;
043:
044: import org.cougaar.core.agent.service.alarm.Alarm;
045: import org.cougaar.core.blackboard.IncrementalSubscription;
046: import org.cougaar.core.blackboard.SubscriptionWatcher;
047: import org.cougaar.core.service.BlackboardService;
048: import org.cougaar.core.service.SchedulerService;
049: import org.cougaar.core.servlet.SimpleServletSupport;
050: import org.cougaar.glm.ldm.Constants;
051: import org.cougaar.lib.util.UTILAllocate;
052: import org.cougaar.planning.ldm.plan.PlanElement;
053: import org.cougaar.planning.ldm.plan.Task;
054: import org.cougaar.planning.ldm.plan.Verb;
055: import org.cougaar.planning.servlet.ServletWorker;
056: import org.cougaar.planning.servlet.data.xml.XMLable;
057: import org.cougaar.util.DynamicUnaryPredicate;
058: import org.cougaar.util.SyncTriggerModelImpl;
059: import org.cougaar.util.Trigger;
060: import org.cougaar.util.TriggerModel;
061:
062: /**
063: * <pre>
064: * One created for every URL access.
065: *
066: * Uses a blackboard
067: * service, a watcher, and a trigger to monitor published tasks to see
068: * when they are complete.
069: *
070: * Note : may not return if it can't go through all it's states.
071: * </pre>
072: **/
073: public class CompletionWatcherWorker extends ServletWorker {
074: private static final int BEFORE_FIRST = 0;
075: private static final int DURING_FIRST = 1;
076: private static final int END_OF_FIRST = 2;
077: private static final int AFTER_FIRST = 3;
078: private static final int DURING_SECOND = 4;
079: private static final int AFTER_SECOND = 5;
080:
081: private static final String USAGE_IMAGE = "WatcherServlet.jpg";
082:
083: /**
084: * <pre>
085: * Handles all HTTP and HTTPS service requests.
086: *
087: * Makes a watcher to watch
088: * the blackboard, a trigger that will call blackboardChanged (),
089: * and a trigger model to connect them.
090: *
091: * When a change to a subscription happens, the method call cascade is :
092: * 1) The watcher's signalNotify method is called, which
093: * 2) Triggers the TriggerModel
094: * 3) The TriggerModel queues the trigger with the SchedulerService
095: * 4) The SchedulerService calls my Trigger's trigger method
096: * 5) That trigger method calls blackboardChanged
097: *
098: * Does these in order :
099: * - Creates all the watcher, trigger model, trigger overhead
100: * - Creates subscriptions to tasks and plan elements
101: * - Calls getResult
102: * - Writes results to output stream with writeResponse ()
103: * - Unsubscribes
104: * - Has the watcher unregister interest
105: * - Tells the trigger model to stop.
106: *
107: * </pre>
108: * @see #getResult
109: * @see #blackboardChanged
110: * @see org.cougaar.planning.servlet.ServletWorker#writeResponse
111: * @see org.cougaar.core.blackboard.SubscriptionWatcher#signalNotify
112: * @see org.cougaar.util.Trigger#trigger
113: * @see org.cougaar.util.SyncTriggerModelImpl
114: * @see org.cougaar.core.service.SchedulerService
115: */
116: public void execute(HttpServletRequest request,
117: HttpServletResponse response, SimpleServletSupport support)
118: throws IOException, ServletException {
119: this .support = (CompletionWatcherSupport) support;
120:
121: Enumeration params = request.getParameterNames();
122: for (; params.hasMoreElements();) {
123: String name = (String) params.nextElement();
124: String value = request.getParameter(name);
125: getSettings(name, value);
126: }
127:
128: if (support.getLog().isDebugEnabled())
129: support.getLog()
130: .debug("CompletionWatcherWorker.Invoked...");
131:
132: if (getImage) {
133: getUsageImage(response);
134: return;
135: }
136:
137: // create a blackboard watcher
138: watcher = new SubscriptionWatcher() {
139: public void signalNotify(int event) {
140: // gets called frequently as the blackboard objects change
141: super .signalNotify(event);
142: tm.trigger();
143: }
144:
145: public String toString() {
146: return "ThinWatcher("
147: + CompletionWatcherWorker.this .toString() + ")";
148: }
149: };
150:
151: // create a callback for running this component
152: Trigger myTrigger = new Trigger() {
153: // no need to "sync" when using "SyncTriggerModel"
154: public void trigger() {
155: watcher.clearSignal();
156: blackboardChanged();
157: }
158:
159: public String toString() {
160: return "Trigger("
161: + CompletionWatcherWorker.this .toString() + ")";
162: }
163: };
164:
165: this .tm = new SyncTriggerModelImpl(this .support
166: .getSchedulerService(), myTrigger);
167: this .support.getBlackboardService().registerInterest(watcher);
168: this .support.getBlackboardService().openTransaction();
169: taskSubscription = (IncrementalSubscription) this .support
170: .getBlackboardService().subscribe(taskPredicate);
171: planElementSubscription = (IncrementalSubscription) this .support
172: .getBlackboardService().subscribe(planElementPredicate);
173: this .support.getBlackboardService().closeTransaction();
174: // activate the trigger model
175: tm.initialize();
176: tm.load();
177: tm.start();
178:
179: // returns an "XMLable" result.
180: XMLable result = getResult();
181:
182: if (result != null)
183: writeResponse(result, response.getOutputStream(), request,
184: support, format);
185:
186: BlackboardService bb = this .support.getBlackboardService();
187: bb.openTransaction();
188:
189: bb.unsubscribe(taskSubscription);
190: bb.unsubscribe(planElementSubscription);
191: bb.closeTransaction();
192: bb.unregisterInterest(watcher);
193:
194: tm.unload();
195: tm.stop();
196: }
197:
198: /** way to do img tags in a cougaar servlet */
199: protected void getUsageImage(HttpServletResponse response)
200: throws IOException, ServletException {
201: InputStream fin;
202: String fileName = USAGE_IMAGE;
203:
204: fin = getClass().getResource(fileName).openStream();
205:
206: if (fin == null) {
207: response.sendError(HttpServletResponse.SC_NOT_FOUND,
208: "Unable to open file \"" + fileName + "\"");
209: support.getLog().error(
210: ".getUsageImage - Tried to find image " + fileName
211: + " but couldn't."
212: + " Should be in this directory...");
213: }
214:
215: String contentType = guessContentType(fileName, fin);
216: if (contentType != null) {
217: response.setContentType(contentType);
218: }
219:
220: // maybe add client "last-modified" header?
221:
222: OutputStream out = response.getOutputStream();
223: byte[] buf = new byte[1024];
224: while (true) {
225: int len = fin.read(buf);
226: if (len < 0) {
227: break;
228: }
229: out.write(buf, 0, len);
230: }
231:
232: fin.close();
233: out.flush();
234: out.close();
235: }
236:
237: private String guessContentType(String fileName, InputStream fin)
238: throws IOException {
239: // examine the first couple bytes of the stream:
240: return java.net.URLConnection.guessContentTypeFromStream(fin);
241: // or instead examine the filename extention (.gif, etc)
242: }
243:
244: /**
245: * Matches PlanElements for tasks that we have sent and which are
246: * complete, i.e. have plan elements and 100% confident reported/estimated
247: * allocation results. Both Failure and Success are accepted.
248: **/
249: private DynamicUnaryPredicate planElementPredicate = new DynamicUnaryPredicate() {
250: public boolean execute(Object o) {
251: if (o instanceof PlanElement) {
252: PlanElement pe = (PlanElement) o;
253: Task task = pe.getTask();
254:
255: if (!isVerbIncluded(task.getVerb()))
256: return false;
257:
258: boolean hasReported = (pe.getReportedResult() != null);
259: boolean hasEstimated = (pe.getEstimatedResult() != null);
260: boolean highConfidence = false;
261:
262: if (hasReported)
263: highConfidence = (pe.getReportedResult()
264: .getConfidenceRating() >= UTILAllocate.HIGHEST_CONFIDENCE);
265: else if (hasEstimated)
266: highConfidence = (pe.getEstimatedResult()
267: .getConfidenceRating() >= UTILAllocate.HIGHEST_CONFIDENCE);
268:
269: return highConfidence;
270: }
271: return false;
272: }
273: };
274:
275: /**
276: * Looking for non-ReportForService tasks
277: **/
278: private DynamicUnaryPredicate taskPredicate = new DynamicUnaryPredicate() {
279: public boolean execute(Object o) {
280: if (o instanceof Task) {
281: Task task = (Task) o;
282: return isVerbIncluded(task.getVerb());
283: }
284: return false;
285: }
286: };
287:
288: protected boolean isVerbIncluded(Verb verb) {
289: if (verb.equals(Constants.Verb.ReportForService))
290: return false;
291: else if (verbsToInclude.isEmpty())
292: return true;
293: else
294: return verbsToInclude.contains(verb);
295: }
296:
297: // unused
298: protected String getPrefix() {
299: return "CompletionWatcher at ";
300: }
301:
302: /**
303: * <pre>
304: * Use a query parameter to set a field
305: *
306: * Sets the recognized parameters : firstInterval, secondInterval
307: * </pre>
308: */
309: public void getSettings(String name, String value) {
310: super .getSettings(name, value);
311:
312: if (support.getLog().isDebugEnabled())
313: support.getLog().debug(
314: "CompletionWatcherWorker.getSettings - name "
315: + name + " value " + value);
316:
317: if (eq(name, CompletionWatcherServlet.FIRST_INTERVAL))
318: firstInterval = Integer.parseInt(value);
319: else if (eq(name, CompletionWatcherServlet.SECOND_INTERVAL))
320: secondInterval = Integer.parseInt(value);
321: else if (eq(name, CompletionWatcherServlet.VERBS_TO_INCLUDE))
322: verbsToInclude = parseVerbs(value);
323: else if (eq(name, "getImage"))
324: getImage = true;
325: }
326:
327: protected Collection parseVerbs(String verbs) {
328: StringTokenizer tokenizer = new StringTokenizer(verbs, ",");
329: Set verbSet = new HashSet();
330:
331: while (tokenizer.hasMoreTokens())
332: verbSet.add(Verb.get(tokenizer.nextToken().trim()));
333:
334: return verbSet;
335: }
336:
337: /**
338: * Main work done here. <p>
339: *
340: * Calls blackboard changed initially, then waits until the state machine has reached "done." <p>
341: * Then records run elapsed time.
342: *
343: * @see #getSettings
344: * @see #blackboardChanged
345: * @return an "XMLable" result.
346: */
347: protected XMLable getResult() {
348: while (!isDone()) {
349: if (support.getLog().isDebugEnabled())
350: support
351: .getLog()
352: .debug(
353: "CompletionWatcherWorker.getResult - checking quiet...");
354:
355: blackboardChanged();
356:
357: synchronized (doneSignal) {
358: if (!isDone())
359: try {
360: doneSignal.wait();
361: } catch (Exception e) {
362: }
363: }
364: }
365:
366: long elapsed = end - start;
367: String readable = getElapsedTime(elapsed);
368:
369: responseData.addTaskAndTime("interval", readable, elapsed);
370:
371: return responseData;
372: }
373:
374: /**
375: * <pre>
376: * Called when either
377: *
378: * 1) the task subscription changes
379: * 2) the plan element subscription changes
380: * 3) a timer expires
381: *
382: * Does book-keeping to keep track of incomplete tasks.
383: * (If a plan element has completed, it's task is removed from the
384: * set of incomplete tasks.)
385: *
386: * Given whether all tasks are complete, if any arrived in the last transaction,
387: * or if a timer expired, calls advance state.
388: *
389: * </pre>
390: **/
391: protected void blackboardChanged() {
392: try {
393: support.getBlackboardService().openTransaction();
394:
395: boolean haveChanged = support.getBlackboardService()
396: .haveCollectionsChanged();
397:
398: boolean planElementChanged = planElementSubscription
399: .hasChanged();
400: boolean taskChanged = taskSubscription.hasChanged();
401: Collection addedCollection = taskSubscription
402: .getAddedCollection();
403: boolean hadNewTasks = !addedCollection.isEmpty();
404:
405: if (hadNewTasks && support.getLog().isDebugEnabled())
406: support.getLog().debug(
407: "blackboardChanged called - had "
408: + addedCollection.size()
409: + " NEW tasks.");
410:
411: if (taskChanged)
412: incompleteTasks.addAll(addedCollection);
413:
414: int numBefore = incompleteTasks.size();
415: int numPEAdded = planElementSubscription
416: .getAddedCollection().size();
417:
418: if (planElementChanged) {
419: Collection addedItems = planElementSubscription
420: .getAddedCollection();
421: numPEAdded = addedItems.size();
422:
423: for (Iterator iter = addedItems.iterator(); iter
424: .hasNext();) {
425: PlanElement pe = (PlanElement) iter.next();
426: support.getLog().debug(
427: "blackboardChanged called - complete PE "
428: + pe.getUID() + " added.");
429: incompleteTasks.remove(pe.getTask());
430: }
431: }
432:
433: if (support.getLog().isDebugEnabled()) {
434: int numPEChanged = planElementSubscription
435: .getChangedCollection().size();
436: int numPERemoved = planElementSubscription
437: .getRemovedCollection().size();
438: support.getLog().debug(
439: "blackboardChanged called - incomplete tasks before "
440: + numBefore + " after "
441: + incompleteTasks.size());
442: support.getLog().debug(
443: "blackboardChanged called - complete PE added "
444: + numPEAdded
445: + " changed "
446: + numPEChanged
447: + " removed "
448: + numPERemoved
449: + " total "
450: + planElementSubscription
451: .getCollection().size());
452: }
453:
454: boolean allComplete = incompleteTasks.isEmpty();
455: if (allComplete || planElementChanged || taskChanged
456: || timerExpired())
457: advanceState(allComplete, hadNewTasks, timerExpired());
458: } catch (Exception exc) {
459: } finally {
460: support.getBlackboardService().closeTransaction();
461: }
462: }
463:
464: /**
465: * Calls changeState -- then tests if isDone and if so signals doneSignal in getResult ().
466: * @see #getResult
467: */
468: protected void advanceState(boolean allComplete,
469: boolean hadNewTasks, boolean expired) {
470: int before = state;
471: changeState(expired, allComplete, hadNewTasks);
472:
473: if (support.getLog().isInfoEnabled()
474: || support.getLog().isDebugEnabled()) {
475: if (state != before) {
476: support.getLog().info(
477: "advanceState - state before " + before
478: + " after " + state);
479: support.getLog().debug(
480: "advanceState - timerExpired " + expired
481: + " all tasks complete " + allComplete
482: + " had new tasks " + hadNewTasks
483: + " incompleteTasks "
484: + incompleteTasks.size());
485: } else {
486: support.getLog().debug(
487: "advanceState - incompleteTasks is "
488: + incompleteTasks.size()
489: + " added "
490: + taskSubscription.getAddedCollection()
491: .size()
492: + " removed "
493: + taskSubscription
494: .getRemovedCollection().size()
495: + " changed "
496: + taskSubscription
497: .getChangedCollection().size());
498: support.getLog().debug(
499: "advanceState - state (" + state + ")"
500: + " timerExpired " + expired
501: + " all tasks complete " + allComplete
502: + " had new tasks " + hadNewTasks);
503: }
504: }
505:
506: if (isDone()) {
507: synchronized (doneSignal) {
508: doneSignal.notify();
509:
510: if (support.getLog().isInfoEnabled())
511: support.getLog().info(
512: "advanceState - run time was "
513: + (end - start) + " millis.");
514: }
515: }
516: }
517:
518: /** we're done if the state machine is in the final state */
519: protected boolean isDone() {
520: return (state == AFTER_SECOND);
521: }
522:
523: /**
524: * <pre>
525: * States are :
526: * BEFORE_FIRST - some tasks are incomplete
527: * Transition : when all are complete, start first timer, go to next state
528: * DURING_FIRST - all tasks are complete
529: * Transition : waited first interval, go to next state
530: * Transition : saw incomplete task before time elapsed, go to previous state
531: * END_OF_FIRST - the required quiet interval has elapsed
532: * Transition : saw first incomplete task, mark time, go to next state
533: * AFTER_FIRST - there are some incomplete tasks
534: * Transition : all tasks complete, mark time, start second timer, go to next state
535: * DURING_SECOND - all tasks are complete again, we're possibly done
536: * Transition : waited second interval, go to next state
537: * Transition : saw incomplete task before time elapsed, forget end time, go to previous state
538: * AFTER_SECOND - we waited long enough to make sure we're done
539: *
540: * </pre>
541: * @param waitedLongEnough -- if the timer expired, signaling the required wait was completed
542: * @param allComplete - are all tasks complete
543: * @param hadNewTasks - even if all tasks were complete, did any new tasks appear?
544: */
545: protected void changeState(boolean waitedLongEnough,
546: boolean allComplete, boolean hadNewTasks) {
547: switch (state) {
548: case BEFORE_FIRST:
549: if (allComplete) {
550: state++;
551: startFirstWait();
552: }
553: break;
554: case DURING_FIRST:
555: if (waitedLongEnough) {
556: state++;
557: cancelTimer();
558: } else if (!allComplete) {
559: state--;
560: cancelTimer();
561: }
562: break;
563: case END_OF_FIRST:
564: if (!allComplete || hadNewTasks) {
565: start = System.currentTimeMillis();
566: support.getLog().info("start is " + new Date(start));
567: state++;
568: }
569: break;
570: case AFTER_FIRST:
571: if (allComplete) {
572: state++;
573: end = System.currentTimeMillis();
574: support.getLog().info(
575: "possible end is " + new Date(end));
576: startSecondWait();
577: }
578: break;
579: case DURING_SECOND:
580: if (waitedLongEnough)
581: state++;
582: else if (!allComplete || hadNewTasks) {
583: state--;
584: cancelTimer();
585: }
586: break;
587: case AFTER_SECOND:
588: break;
589: }
590: ;
591: }
592:
593: protected void startFirstWait() {
594: if (support.getLog().isDebugEnabled())
595: support.getLog().debug("Starting first timer");
596:
597: startTimer(firstInterval);
598: }
599:
600: protected void startSecondWait() {
601: if (support.getLog().isDebugEnabled())
602: support.getLog().debug("Starting second timer");
603:
604: startTimer(secondInterval);
605: }
606:
607: /**
608: * Schedule a update wakeup after some interval of time. <p>
609: * Uses an alarm.
610: * @param delay how long to delay before the timer expires.
611: * @see org.cougaar.core.agent.service.alarm.Alarm
612: * @see org.cougaar.core.service.AlarmService#addRealTimeAlarm
613: **/
614: protected void startTimer(final long delay) {
615: if (timer != null)
616: return; // update already scheduled
617: if (support.getLog().isDebugEnabled())
618: support.getLog()
619: .debug("Starting timer with delay " + delay);
620:
621: timer = new Alarm() {
622: long expirationTime = System.currentTimeMillis() + delay
623: * 1000l;
624: boolean expired = false;
625:
626: public long getExpirationTime() {
627: return expirationTime;
628: }
629:
630: public synchronized void expire() {
631: if (!expired) {
632: expired = true;
633: {
634: org.cougaar.core.service.BlackboardService bbs = support
635: .getBlackboardService();
636: if (bbs != null)
637: bbs.signalClientActivity();
638: }
639: }
640: }
641:
642: public boolean hasExpired() {
643: return expired;
644: }
645:
646: public synchronized boolean cancel() {
647: boolean was = expired;
648: expired = true;
649: return was;
650: }
651: };
652: support.getAlarmService().addRealTimeAlarm(timer);
653: }
654:
655: /**
656: * Cancel the timer.
657: **/
658: protected void cancelTimer() {
659: if (timer == null)
660: return;
661: if (support.getLog().isDebugEnabled())
662: support.getLog().debug("Cancelling timer");
663: timer.cancel();
664: timer = null;
665: }
666:
667: /**
668: * Test if the timer has expired.
669: * @return false if the timer is not running or has not yet expired
670: * else return true.
671: **/
672: protected boolean timerExpired() {
673: return timer != null && timer.hasExpired();
674: }
675:
676: private Alarm timer;
677:
678: /** encodes a time interval in a min:sec:millis format */
679: protected String getElapsedTime(long diff) {
680: long min = diff / 60000l;
681: long sec = (diff - (min * 60000l)) / 1000l;
682: long millis = diff - (min * 60000l) - (sec * 1000l);
683: return min + ":" + ((sec < 10) ? "0" : "") + sec + ":"
684: + ((millis < 10) ? "00" : ((millis < 100) ? "0" : ""))
685: + millis;
686: }
687:
688: // rely upon load-time introspection to set these services -
689: // don't worry about revokation.
690: public final void setSchedulerService(SchedulerService ss) {
691: scheduler = ss;
692: }
693:
694: /**
695: * Subscription to PlanElements disposing of our tasks
696: **/
697: private IncrementalSubscription planElementSubscription;
698:
699: /**
700: * Subscription to PlanElements disposing of our tasks
701: **/
702: private IncrementalSubscription taskSubscription;
703:
704: /** for waiting for a subscription on the blackboard */
705: protected SubscriptionWatcher watcher;
706: /** for waiting for a subscription on the blackboard */
707: protected TriggerModel tm;
708: /** for waiting for a subscription on the blackboard */
709: private SchedulerService scheduler;
710:
711: /** returned response */
712: protected GLMStimulatorResponseData responseData = new GLMStimulatorResponseData();
713:
714: protected Object doneSignal = new Object();
715: protected int state = BEFORE_FIRST;
716: protected long start, end;
717:
718: protected long firstInterval = 10l, secondInterval = 10l; // seconds
719: protected boolean getImage = false;
720: protected Set incompleteTasks = new HashSet();
721: protected Collection verbsToInclude = new HashSet();
722:
723: protected CompletionWatcherSupport support;
724: }
|