001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.util;
028:
029: // soon to move to "org.cougaar.util.Trigger":
030:
031: /**
032: * A standard <code>TriggerModel</code> that batches "trigger()"
033: * requests for a single-threaded client <code>Trigger</code>,
034: * plus provides support for "suspend()/resume()" and
035: * "start()/stop()".
036: * <p>
037: * Only one "clientTrigger.trigger()" will be run at a time,
038: * so the client Trigger can be non-synchronized. Trigger
039: * requests are batched -- for example, the first client
040: * request for "this.trigger()" calls the trigger-registry's
041: * "trigger()" (queue), and further "this.trigger()" requests
042: * are ignored until the trigger-registry runs the client's
043: * trigger.
044: * <p>
045: * If the "clientTrigger.trigger()" throws an exception then
046: * "halt()" is called -- the model is suspended and stopped,
047: * and the exception is passed back to the trigger-registry.
048: * <p>
049: * Note that "suspend()" will wait for a running
050: * "clientTrigger.trigger()" to complete, and if the "suspend()"
051: * is interrupted then the behavior is as if the "suspend()" was
052: * never requested. The same logic goes for "stop()".
053: *
054: * @see TriggerModel
055: */
056: public final class SyncTriggerModelImpl implements TriggerModel {
057:
058: /**
059: * "Debug" flag; set to "true" for verbose output.
060: */
061: private static final boolean DEBUG = false;
062:
063: /**
064: * "Profile" flag; set to "true" for trigger/run statistics
065: * to be printed every five seconds.
066: */
067: private static final boolean PROFILE = false;
068:
069: private static final ProfileStats profileStats = (PROFILE ? (new ProfileStats())
070: : null);
071:
072: static {
073: if (PROFILE) {
074: Runnable r = new Runnable() {
075: public void run() {
076: while (true) {
077: System.out.println(profileStats);
078: try {
079: Thread.sleep(5000);
080: } catch (Exception e) {
081: }
082: }
083: }
084: };
085: Thread t = new Thread(r);
086: t.start();
087: }
088: }
089:
090: private final TriggerRegistry triggerRegistry;
091: private final Trigger clientTrigger;
092: private Trigger registryTrigger;
093:
094: private final Trigger innerTrigger = new Trigger() {
095: public void trigger() {
096: runInnerTrigger();
097: }
098:
099: public String toString() {
100: return SyncTriggerModelImpl.this .toString();
101: }
102: };
103:
104: private final Object stateLock = new Object();
105: private int state;
106:
107: // state flags:
108: private static final int TRIGGERED = (1 << 0);
109: private static final int QUEUED = (1 << 1);
110: private static final int RUNNING = (1 << 2);
111: private static final int SUSPENDED = (1 << 3);
112: private static final int STOPPED = (1 << 4);
113:
114: public SyncTriggerModelImpl(TriggerRegistry triggerRegistry,
115: Trigger clientTrigger) {
116: this .clientTrigger = clientTrigger;
117: this .triggerRegistry = triggerRegistry;
118: // null-check
119: if ((clientTrigger == null) || (triggerRegistry == null)) {
120: throw new NullPointerException();
121: }
122: // begin as "stopped", pending "start()"
123: state = STOPPED;
124: }
125:
126: // for the client's use
127: public void trigger() {
128: boolean enQ = false;
129: synchronized (stateLock) {
130: int ostate = state;
131: if (state == 0) {
132: state = (TRIGGERED | QUEUED);
133: enQ = true;
134: } else {
135: state |= TRIGGERED;
136: }
137: if (DEBUG) {
138: System.out.println(this + " trigger "
139: + getState(ostate) + " -> " + getState(state)
140: + ", enQ=" + enQ);
141: } else {
142: // the "ostate" is unused if (DEBUG==false), so the
143: // "int ostate = state" should be optimized away.
144: }
145: }
146: if (PROFILE) {
147: profileStats.addTrigger();
148: }
149: if (enQ) {
150: registryTrigger.trigger();
151: }
152: }
153:
154: public void initialize() {
155: // no-op
156: }
157:
158: public void load() {
159: // no-op
160: }
161:
162: public void start() {
163: boolean enQ = false;
164: synchronized (stateLock) {
165: if ((state & STOPPED) == 0) {
166: if (DEBUG) {
167: System.out.println(this + " <skip> start "
168: + getState(state));
169: }
170: return;
171: }
172: int ostate = state;
173: if ((state & RUNNING) == 0) {
174: // assert (registryTrigger == null);
175: registryTrigger = triggerRegistry
176: .register(innerTrigger);
177: if (registryTrigger == null) {
178: throw new NullPointerException(
179: "Unable to register " + clientTrigger
180: + " in registry " + triggerRegistry);
181: }
182: state &= ~STOPPED;
183: if (state == TRIGGERED) {
184: state = (TRIGGERED | QUEUED);
185: enQ = true;
186: }
187: } else {
188: state &= ~STOPPED;
189: }
190: if (DEBUG) {
191: System.out.println(this + " start " + getState(ostate)
192: + " -> " + getState(state) + ", enQ=" + enQ);
193: }
194: }
195: if (enQ) {
196: registryTrigger.trigger();
197: }
198: }
199:
200: // for trigger-registry callback's use, called from "innerTrigger"
201: private final void runInnerTrigger() {
202: synchronized (stateLock) {
203: // assert ((state & TRIGGERED) != 0);
204: int ostate = state;
205: if (state == (TRIGGERED | QUEUED)) {
206: state = RUNNING;
207: } else {
208: if ((state & (SUSPENDED | STOPPED)) != 0) {
209: state &= ~QUEUED;
210: if (DEBUG) {
211: System.out.println(this + " <skip> run "
212: + getState(ostate) + " -> "
213: + getState(state));
214: }
215: return;
216: }
217: state &= ~(TRIGGERED | QUEUED);
218: state |= RUNNING;
219: }
220: if (DEBUG) {
221: System.out.println(this + " run " + getState(ostate)
222: + " -> " + getState(state));
223: }
224: }
225: if (PROFILE) {
226: profileStats.addRun();
227: }
228: try {
229: clientTrigger.trigger();
230: } catch (Throwable die) {
231: synchronized (stateLock) {
232: state &= ~RUNNING;
233: halt();
234: }
235: if (die instanceof RuntimeException) {
236: throw (RuntimeException) die;
237: } else {
238: throw new RuntimeException(
239: "SyncTriggerModelImpl rethrowing "
240: + die.getMessage(), die);
241: }
242: }
243: boolean enQ = false;
244: synchronized (stateLock) {
245: int ostate = state;
246: if (state == RUNNING) {
247: state = 0;
248: } else if ((state & (SUSPENDED | STOPPED)) == 0) {
249: state &= ~RUNNING;
250: if (state == TRIGGERED) {
251: state = (TRIGGERED | QUEUED);
252: enQ = true;
253: }
254: } else {
255: state &= ~RUNNING;
256: if ((state & STOPPED) != 0) {
257: // assert (registryTrigger != null);
258: triggerRegistry.unregister(innerTrigger);
259: registryTrigger = null;
260: }
261: if (DEBUG) {
262: System.out.println(this + " notify "
263: + getState(ostate) + " -> "
264: + getState(state));
265: }
266: stateLock.notifyAll();
267: }
268: if (DEBUG) {
269: System.out.println(this + " ran " + getState(ostate)
270: + " -> " + getState(state) + ", enQ=" + enQ);
271: }
272: }
273: if (enQ) {
274: registryTrigger.trigger();
275: }
276: }
277:
278: public void suspend() {
279: synchronized (stateLock) {
280: if ((state & SUSPENDED) != 0) {
281: if (DEBUG) {
282: System.out.println(this + " <skip> suspend "
283: + getState(state));
284: }
285: return;
286: }
287: int ostate = state;
288: state |= SUSPENDED;
289: if (DEBUG) {
290: System.out.println(this + " suspend "
291: + getState(ostate) + " -> " + getState(state));
292: }
293: while ((state & RUNNING) != 0) {
294: try {
295: if (DEBUG) {
296: System.out.println(this
297: + " waiting for suspend "
298: + getState(state));
299: }
300: stateLock.wait();
301: if (DEBUG) {
302: System.out.println(this + " wake for suspend "
303: + getState(state));
304: }
305: } catch (InterruptedException ie) {
306: if ((state & RUNNING) != 0) {
307: state &= ~SUSPENDED;
308: System.err.println(clientTrigger
309: + " \"suspend()\" interrupted,"
310: + " cancelling the \"suspend()\"");
311: }
312: break;
313: }
314: }
315: }
316: }
317:
318: public void resume() {
319: boolean enQ = false;
320: synchronized (stateLock) {
321: int ostate = state;
322: if ((state & SUSPENDED) == 0) {
323: if (DEBUG) {
324: System.out.println(this + " <skip> resume "
325: + getState(state));
326: }
327: return;
328: }
329: state &= ~SUSPENDED;
330: if (state == TRIGGERED) {
331: state = (TRIGGERED | QUEUED);
332: enQ = true;
333: }
334: if (DEBUG) {
335: System.out.println(this + " resume " + getState(ostate)
336: + " -> " + getState(state) + ", enQ=" + enQ);
337: }
338: }
339: if (enQ) {
340: registryTrigger.trigger();
341: }
342: }
343:
344: public void stop() {
345: synchronized (stateLock) {
346: if ((state & STOPPED) != 0) {
347: if (DEBUG) {
348: System.out.println(this + " <skip> stop "
349: + getState(state));
350: }
351: return;
352: }
353: int ostate = state;
354: state |= STOPPED;
355: if ((state & RUNNING) == 0) {
356: // assert (registryTrigger != null);
357: triggerRegistry.unregister(innerTrigger);
358: registryTrigger = null;
359: } else {
360: while ((state & RUNNING) != 0) {
361: try {
362: if (DEBUG) {
363: System.out.println(this
364: + " waiting for stop "
365: + getState(state));
366: }
367: stateLock.wait();
368: if (DEBUG) {
369: System.out.println(this + " wake for stop "
370: + getState(state));
371: }
372: } catch (InterruptedException ie) {
373: if ((state & RUNNING) != 0) {
374: state &= ~STOPPED;
375: System.err.println(clientTrigger
376: + " \"stop()\" interrupted,"
377: + " cancelling the \"stop()\"");
378: }
379: break;
380: }
381: }
382: }
383: if (DEBUG) {
384: System.out.println(this + " stop " + getState(ostate)
385: + " -> " + getState(state));
386: }
387: }
388: }
389:
390: public void halt() {
391: synchronized (stateLock) {
392: suspend();
393: stop();
394: }
395: }
396:
397: public void unload() {
398: // no-op
399: }
400:
401: public int getModelState() {
402: synchronized (stateLock) {
403: if ((state & RUNNING) != 0) {
404: return ACTIVE;
405: } else if ((state & STOPPED) != 0) {
406: return LOADED;
407: } else if ((state & SUSPENDED) != 0) {
408: return IDLE;
409: } else {
410: return ACTIVE;
411: }
412: }
413: }
414:
415: private static String getState(int s) {
416: if (s == 0) {
417: return "I"; // Idle
418: } else {
419: StringBuffer buf = new StringBuffer(5);
420: if ((s & TRIGGERED) != 0) {
421: buf.append("T"); // Triggered
422: }
423: if ((s & QUEUED) != 0) {
424: buf.append("Q"); // Queued
425: }
426: if ((s & RUNNING) != 0) {
427: buf.append("R"); // Running
428: }
429: if ((s & SUSPENDED) != 0) {
430: buf.append("S"); // Suspended
431: }
432: if ((s & STOPPED) != 0) {
433: buf.append("D"); // Dead
434: }
435: return buf.toString();
436: }
437: }
438:
439: public String toString() {
440: int s = state;
441: return getState(s) + ":" + clientTrigger + ":"
442: + registryTrigger;
443: }
444:
445: // for internal PROFILE use:
446: private static class ProfileStats {
447: private int trigCounter = 1;
448: private int runCounter = 1;
449:
450: public ProfileStats() {
451: }
452:
453: public synchronized void addTrigger() {
454: trigCounter++;
455: }
456:
457: public synchronized void addRun() {
458: runCounter++;
459: }
460:
461: public synchronized String toString() {
462: return "\nTRIGGER-PROFILE {" + "\n #triggers: "
463: + trigCounter + "\n #runs: " + runCounter
464: + "\n runs/trigs: "
465: + (((double) runCounter) / trigCounter) + "\n}";
466: }
467: }
468: }
|