001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026: package org.cougaar.core.examples.mobility.script;
027:
028: import java.util.ArrayList;
029: import java.util.Collection;
030: import java.util.Enumeration;
031: import java.util.HashMap;
032: import java.util.Iterator;
033: import java.util.List;
034: import java.util.Map;
035: import org.cougaar.core.agent.service.alarm.Alarm;
036: import org.cougaar.core.blackboard.CollectionSubscription;
037: import org.cougaar.core.blackboard.IncrementalSubscription;
038: import org.cougaar.core.component.ServiceBroker;
039: import org.cougaar.core.examples.mobility.ldm.MobilityTestFactory;
040: import org.cougaar.core.examples.mobility.ldm.Proc;
041: import org.cougaar.core.examples.mobility.ldm.Script;
042: import org.cougaar.core.examples.mobility.ldm.ScriptGoto;
043: import org.cougaar.core.examples.mobility.ldm.ScriptLabel;
044: import org.cougaar.core.examples.mobility.ldm.ScriptStep;
045: import org.cougaar.core.examples.mobility.ldm.Step;
046: import org.cougaar.core.examples.mobility.ldm.StepOptions;
047: import org.cougaar.core.examples.mobility.ldm.StepStatus;
048: import org.cougaar.core.mobility.Ticket;
049: import org.cougaar.core.mobility.ldm.MobilityFactory;
050: import org.cougaar.core.mobility.ldm.MoveAgent;
051: import org.cougaar.core.mobility.ldm.MoveAgent.Status;
052: import org.cougaar.core.mts.MessageAddress;
053: import org.cougaar.core.node.NodeIdentificationService;
054: import org.cougaar.core.plugin.ComponentPlugin;
055: import org.cougaar.core.service.AgentIdentificationService;
056: import org.cougaar.core.service.AlarmService;
057: import org.cougaar.core.service.BlackboardService;
058: import org.cougaar.core.service.DomainService;
059: import org.cougaar.core.service.LoggingService;
060: import org.cougaar.core.util.UID;
061: import org.cougaar.core.util.UniqueObject;
062: import org.cougaar.util.UnaryPredicate;
063:
064: /**
065: * This plugin watches for new script Procs and
066: * runs them.
067: * <p>
068: * Steps are created by this plugin.
069: */
070: public class ProcRunnerPlugin extends ComponentPlugin {
071:
072: private static final UnaryPredicate PROCESS_PRED = new UnaryPredicate() {
073: public boolean execute(Object o) {
074: return (o instanceof Proc);
075: }
076: };
077:
078: private static final UnaryPredicate SCRIPT_PRED = new UnaryPredicate() {
079: public boolean execute(Object o) {
080: return (o instanceof Script);
081: }
082: };
083:
084: private MessageAddress todd;
085: private MessageAddress agentId;
086: private MessageAddress nodeId;
087:
088: private IncrementalSubscription scriptSub;
089: private IncrementalSubscription procSub;
090: private IncrementalSubscription stepSub;
091:
092: private LoggingService log;
093: private DomainService domain;
094:
095: private MobilityFactory mobilityFactory;
096: private MobilityTestFactory mobilityTestFactory;
097:
098: // Non-persisted cache of blackboard objects, for quick access.
099: // On rehydration it is fully reconstructed from the blackboard.
100: //
101: // maps proc UID to internal Entry
102: private Map uidToEntry = new HashMap(13);
103:
104: public void load() {
105: super .load();
106:
107: // get the logger
108: log = (LoggingService) getServiceBroker().getService(this ,
109: LoggingService.class, null);
110: if (log == null) {
111: log = LoggingService.NULL;
112: }
113:
114: // get the agentId
115: AgentIdentificationService agentIdService = (AgentIdentificationService) getServiceBroker()
116: .getService(this , AgentIdentificationService.class,
117: null);
118: if (agentIdService == null) {
119: throw new RuntimeException(
120: "Unable to obtain agent-id service");
121: }
122: this .agentId = agentIdService.getMessageAddress();
123: getServiceBroker().releaseService(this ,
124: AgentIdentificationService.class, agentIdService);
125: if (agentId == null) {
126: throw new RuntimeException("Unable to obtain agent id");
127: }
128: todd = agentId;
129:
130: // get the nodeId
131: NodeIdentificationService nodeIdService = (NodeIdentificationService) getServiceBroker()
132: .getService(this , NodeIdentificationService.class, null);
133: if (nodeIdService == null) {
134: throw new RuntimeException(
135: "Unable to obtain node-id service");
136: }
137: this .nodeId = nodeIdService.getMessageAddress();
138: getServiceBroker().releaseService(this ,
139: NodeIdentificationService.class, nodeIdService);
140: if (nodeId == null) {
141: throw new RuntimeException("Unable to obtain node id");
142: }
143:
144: // get the mobility factories
145: this .domain = (DomainService) getServiceBroker().getService(
146: this , DomainService.class, null);
147: if (domain == null) {
148: throw new RuntimeException(
149: "Unable to obtain domain service");
150: }
151: this .mobilityFactory = (MobilityFactory) domain
152: .getFactory("mobility");
153: if (mobilityFactory == null) {
154: throw new RuntimeException(
155: "Mobility factory (and domain \"mobility\")"
156: + " not enabled");
157: }
158: this .mobilityTestFactory = (MobilityTestFactory) domain
159: .getFactory("mobilityTest");
160: if (mobilityTestFactory == null) {
161: throw new RuntimeException(
162: "Mobility Test factory (and domain"
163: + " \"mobilityTest\") not enabled");
164: }
165:
166: if (log.isDebugEnabled()) {
167: log.debug(todd + "Loaded");
168: }
169: }
170:
171: public void unload() {
172: if (domain != null) {
173: getServiceBroker().releaseService(this ,
174: DomainService.class, domain);
175: domain = null;
176: }
177: if ((log != null) && (log != LoggingService.NULL)) {
178: getServiceBroker().releaseService(this ,
179: LoggingService.class, log);
180: log = LoggingService.NULL;
181: }
182: super .unload();
183: }
184:
185: protected void setupSubscriptions() {
186: // subscribe to procs that we'll execute
187: procSub = (IncrementalSubscription) blackboard
188: .subscribe(PROCESS_PRED);
189:
190: // subscribe to scripts, in case they're removed
191: scriptSub = (IncrementalSubscription) blackboard
192: .subscribe(SCRIPT_PRED);
193:
194: // subscribe to our own steps
195: stepSub = (IncrementalSubscription) blackboard
196: .subscribe(createStepPredicate(agentId));
197:
198: if (blackboard.didRehydrate()) {
199: // recreate cache from blackboard
200: recreateEntries();
201: } else {
202: // create new cache
203: }
204: }
205:
206: protected void execute() {
207: if (log.isDebugEnabled()) {
208: log.debug(todd + "Execute");
209: }
210:
211: // watch procs
212: if (procSub.hasChanged()) {
213: // added procs
214: Enumeration en = procSub.getAddedList();
215: while (en.hasMoreElements()) {
216: Proc proc = (Proc) en.nextElement();
217: addedProc(proc);
218: }
219: // ignore changes: all are done this plugin
220: // watch removes
221: en = procSub.getRemovedList();
222: while (en.hasMoreElements()) {
223: Proc proc = (Proc) en.nextElement();
224: removedProc(proc);
225: }
226: }
227:
228: // watch scripts
229: if (scriptSub.hasChanged()) {
230: // ignore adds: separate plugins create procs
231: // ignore changes: immutable
232: Enumeration en = scriptSub.getRemovedList();
233: while (en.hasMoreElements()) {
234: Script script = (Script) en.nextElement();
235: removedScript(script);
236: }
237: }
238:
239: // watch steps
240: if (stepSub.hasChanged()) {
241: // ignore adds: this plugin did it
242: // watch changes
243: Enumeration en = stepSub.getChangedList();
244: while (en.hasMoreElements()) {
245: Step step = (Step) en.nextElement();
246: changedStep(step);
247: }
248: // ignore removes
249: }
250: }
251:
252: private void addedProc(Proc proc) {
253:
254: UID procUID = proc.getUID();
255: UID scriptUID = proc.getScriptUID();
256:
257: // see if already listed
258: Object oldEntry = uidToEntry.get(procUID);
259: if (oldEntry != null) {
260: // maybe just rehydration
261: if (log.isDebugEnabled()) {
262: log.debug("Proc " + procUID + " with script "
263: + scriptUID + " overrides existing entry");
264: }
265: return;
266: }
267:
268: // lookup script
269: Script script = null;
270: Collection scripts = scriptSub.getCollection();
271: Iterator iter = scripts.iterator();
272: for (int i = 0, n = scripts.size(); i < n; i++) {
273: Script s = (Script) iter.next();
274: if (scriptUID.equals(s.getUID())) {
275: script = s;
276: break;
277: }
278: }
279: if (script == null) {
280: if (log.isErrorEnabled()) {
281: log.error("Proc " + procUID
282: + " added with unknown script " + scriptUID);
283: }
284: return;
285: }
286:
287: // create entry
288: Entry entry = new Entry(proc, script);
289: uidToEntry.put(procUID, entry);
290:
291: advanceEntry(entry);
292: }
293:
294: private void removedProc(Proc proc) {
295: // find entry
296: Entry entry = (Entry) uidToEntry.remove(proc.getUID());
297: if (entry == null) {
298: // not listed?
299: return;
300: }
301:
302: // kill step
303: Step step = entry.step;
304: if (step != null) {
305: blackboard.publishRemove(step);
306: }
307: }
308:
309: private void changedStep(Step step) {
310: // find entry
311: Entry entry;
312: if (uidToEntry.isEmpty()) {
313: // not listed?
314: return;
315: } else {
316: int n = uidToEntry.size();
317: Iterator iter = uidToEntry.values().iterator();
318: for (int i = 0;; i++) {
319: if (i >= n) {
320: // not listed?
321: return;
322: }
323: Entry e = (Entry) iter.next();
324: if (e.step != step) {
325: continue;
326: }
327: entry = e;
328: break;
329: }
330: }
331: advanceEntry(entry);
332: }
333:
334: private void advanceEntry(Entry entry) {
335: Proc proc = entry.proc;
336: UID procUID = proc.getUID();
337: Script script = entry.script;
338:
339: Step step = entry.step;
340:
341: // check for step in blackboard, in case of restart
342: if (step == null) {
343: Collection c = stepSub.getCollection();
344: if (!(c.isEmpty())) {
345: int n = c.size();
346: Iterator iter = c.iterator();
347: for (int i = 0; i < n; i++) {
348: Step s = (Step) iter.next();
349: Object ownerId = s.getOptions().getOwnerId();
350: if (procUID.equals(ownerId)) {
351: step = s;
352: entry.step = s;
353: break;
354: }
355: }
356: }
357: }
358:
359: // check step status
360: long startTime = -1;
361: if (step != null) {
362: StepStatus status = step.getStatus();
363: startTime = status.getStartTime();
364: long endTime = status.getEndTime();
365: if (endTime <= 0) {
366: if (log.isDebugEnabled()) {
367: log.debug("Still working on step " + step.getUID()
368: + " <" + status.getStateAsString()
369: + "> of proc " + procUID);
370: }
371: return;
372: }
373: if (status.getState() != StepStatus.SUCCESS) {
374: if (log.isErrorEnabled()) {
375: log.error(status.getStateAsString() + " on step "
376: + step.getUID() + " (proc " + procUID
377: + ", script " + script.getUID()
378: + "), proc failed!");
379: }
380: // abort proc on failure
381: proc.setEndTime(System.currentTimeMillis());
382: blackboard.publishChange(proc);
383: // remove entry
384: uidToEntry.remove(entry);
385: // leave step for debugging
386: return;
387: }
388: if (log.isDebugEnabled()) {
389: log.debug("Completed step " + step.getUID()
390: + " of proc " + proc.getUID() + " with status "
391: + status);
392: }
393: // remove completed step
394: blackboard.publishRemove(step);
395: step = null;
396: entry.step = null;
397: }
398:
399: // get next script index
400: int nextIdx = 1 + proc.getScriptIndex();
401: int nGotos = 0;
402: ScriptStep nextScriptStep;
403: while (true) {
404: if (nextIdx >= script.getSize()) {
405: // end of script
406: if (log.isInfoEnabled()) {
407: log.info("Completed proc " + procUID
408: + " of script " + script.getUID());
409: }
410: // update proc
411: proc.setEndTime(System.currentTimeMillis());
412: proc.setScriptIndex(nextIdx);
413: proc.setStepUID(null);
414: blackboard.publishChange(proc);
415:
416: // update entry
417: entry.step = null;
418: return;
419: }
420: Script.Entry x = script.getEntry(nextIdx);
421: if (x instanceof ScriptStep) {
422: nextScriptStep = (ScriptStep) x;
423: break;
424: }
425: if (x instanceof ScriptLabel) {
426: ++nextIdx;
427: continue;
428: }
429: if (!(x instanceof ScriptGoto)) {
430: throw new RuntimeException("Unknown script element["
431: + nextIdx + "]: " + x);
432: }
433: nextIdx = ((ScriptGoto) x).getIndex();
434: if (++nGotos > 20) {
435: if (log.isErrorEnabled()) {
436: ScriptGoto sg = (ScriptGoto) script
437: .getEntry(1 + proc.getScriptIndex());
438: log.error("Possible infinite loop in script "
439: + script.getUID() + " beginning at "
440: + sg.getName());
441: }
442: return;
443: }
444: }
445:
446: // create fleshed-out step options
447: StepOptions nextOpts = createStepOptions(proc, startTime,
448: nextScriptStep);
449:
450: // create step
451: Step nextStep = mobilityTestFactory.createStep(nextOpts);
452: blackboard.publishAdd(nextStep);
453:
454: // update proc
455: proc.setScriptIndex(nextIdx);
456: proc.setMoveCount(1 + proc.getMoveCount());
457: proc.setStepUID(nextStep.getUID());
458: blackboard.publishChange(proc);
459:
460: // update entry
461: entry.step = nextStep;
462:
463: if (log.isInfoEnabled()) {
464: log.info("Created step #" + proc.getMoveCount()
465: + " at script index " + nextIdx + " (script: "
466: + script.getUID() + ", proc: " + proc.getUID()
467: + ", step: " + nextStep.getUID() + ")");
468: }
469: }
470:
471: private StepOptions createStepOptions(Proc proc,
472: long priorMoveStartTime, ScriptStep scriptStep) {
473: StepOptions opts = scriptStep.getStepOptions();
474: MessageAddress newTarget = opts.getTarget();
475: if (newTarget == null) {
476: newTarget = agentId;
477: }
478: Ticket ticket = opts.getTicket();
479: Object ticketId = mobilityFactory.createTicketIdentifier();
480: Ticket newTicket = new Ticket(ticketId,
481: ticket.getMobileAgent(), ticket.getOriginNode(), ticket
482: .getDestinationNode(), ticket.isForceRestart());
483: long nowTime = System.currentTimeMillis();
484: long newPauseTime = opts.getPauseTime();
485: if (scriptStep.hasFlag(ScriptStep.ADD_PAUSE)) {
486: newPauseTime += nowTime;
487: } else if (scriptStep.hasFlag(ScriptStep.PRI_PAUSE)) {
488: newPauseTime += (priorMoveStartTime > 0 ? priorMoveStartTime
489: : nowTime);
490: } else if (scriptStep.hasFlag(ScriptStep.REL_PAUSE)) {
491: newPauseTime += proc.getStartTime();
492: }
493: long newTimeoutTime = opts.getTimeoutTime();
494: if (scriptStep.hasFlag(ScriptStep.ADD_TIMEOUT)) {
495: newTimeoutTime += newPauseTime;
496: } else if (scriptStep.hasFlag(ScriptStep.PRI_TIMEOUT)) {
497: newTimeoutTime += (priorMoveStartTime > 0 ? priorMoveStartTime
498: : nowTime);
499: } else if (scriptStep.hasFlag(ScriptStep.REL_TIMEOUT)) {
500: newTimeoutTime += proc.getStartTime();
501: }
502: StepOptions ret = new StepOptions(proc.getUID(), agentId,
503: newTarget, newTicket, newPauseTime, newTimeoutTime);
504: return ret;
505: }
506:
507: private void removedScript(Script script) {
508: if (!(uidToEntry.isEmpty())) {
509: int n = uidToEntry.size();
510: Iterator iter = uidToEntry.values().iterator();
511: for (int i = 0; i < n; i++) {
512: Entry entry = (Entry) iter.next();
513: if (entry.script != script) {
514: continue;
515: }
516: // remove entry
517: iter.remove();
518: --n;
519: // cleanup
520: Step step = entry.step;
521: if (step != null) {
522: blackboard.publishRemove(step);
523: }
524: Proc proc = entry.proc;
525: if (procSub.contains(proc)) {
526: // kill proc! shouldn't happen if
527: // proc creator does cleanup.
528: blackboard.publishRemove(proc);
529: }
530: }
531: }
532: }
533:
534: private void recreateEntries() {
535: // this should be empty!
536: uidToEntry.clear();
537:
538: // recreate from blackboard contents
539: Collection c = procSub.getCollection();
540: if (!(c.isEmpty())) {
541: int n = c.size();
542: Iterator iter = c.iterator();
543: for (int i = 0; i < n; i++) {
544: Proc proc = (Proc) iter.next();
545: addedProc(proc);
546: }
547: }
548: }
549:
550: private static UnaryPredicate createStepPredicate(
551: final MessageAddress agentId) {
552: return new UnaryPredicate() {
553: public boolean execute(Object o) {
554: if (o instanceof Step) {
555: Step step = (Step) o;
556: MessageAddress target = step.getOptions()
557: .getTarget();
558: return agentId.equals(target);
559: }
560: return false;
561: }
562: };
563: }
564:
565: private static class Entry {
566:
567: public final Proc proc;
568: public final Script script;
569: public Step step;
570:
571: public Entry(Proc proc, Script script) {
572: this .proc = proc;
573: this .script = script;
574: if ((proc == null) || (script == null)) {
575: throw new IllegalArgumentException("null proc/script");
576: }
577: }
578:
579: public String toString() {
580: return "Entry {" + "\n procUID: " + proc.getUID()
581: + "\n proc: " + proc + "\n script: " + script
582: + "\n step: " + step + "\n}";
583: }
584: }
585:
586: }
|