001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.agent.service.alarm;
028:
029: import java.util.ArrayList;
030: import java.util.Comparator;
031: import java.util.Iterator;
032: import java.util.ListIterator;
033:
034: import org.cougaar.bootstrap.SystemProperties;
035: import org.cougaar.core.service.ThreadService;
036: import org.cougaar.core.thread.Schedulable;
037: import org.cougaar.util.log.Logger;
038: import org.cougaar.util.log.Logging;
039:
040: /**
041: * Implement a basic timer class (not to be confused with
042: * {@link java.util.Timer}) that invokes Alarm "expire()" methods
043: * when they are due.
044: * <p>
045: * The base class operated on System time, but subclasses may operate
046: * on different scales.
047: * <p>
048: * Visible feedback may be controlled by standard logging for class:<pre>
049: * org.cougaar.core.agent.service.alarm.Timer:
050: * WARN also enables logging of when (real-time only) alarms are more than Epsilon millis late
051: * INFO also enables logging of when alarms take more than Epsilon millis to ring
052: * DEBUG also enables reports of every alarm ringing.
053: * </pre>
054: * <p>
055: * Subclasses may override the feedback printed.
056: *
057: * @property org.cougaar.core.agent.service.alarm.Timer.epsilon=10000 milliseconds
058: * considered a relatively long time for alarm delivery.
059: * @property org.cougaar.core.agent.service.alarm.Timer.useSchedulable=true Set to false
060: * to use in-band delivery of alarm sounding rather than using a schedulable to wrap the
061: * delivery.
062: */
063: public abstract class Timer implements Runnable {
064: protected final static Logger log = Logging.getLogger(Timer.class);
065:
066: protected static final long EPSILON = SystemProperties.getLong(
067: "org.cougaar.core.agent.service.alarm.Timer.epsilon",
068: 10 * 1000L);
069: protected static final boolean USE_SCHEDULABLE = SystemProperties
070: .getBoolean(
071: "org.cougaar.core.agent.service.alarm.Timer.useSchedulable",
072: true);
073:
074: /** all alarms */
075: // this could be optimized to use a heap
076: private final ArrayList alarms = new ArrayList();
077:
078: /** Pending Periodic Alarms.
079: * PeriodicAlarms which have gone off but
080: * need to be added back on. These are collected and added
081: * back in a second pass so that we don't get terrible behavior
082: * if someone abuses a periodic alarm
083: */
084: // only modified in the run loop
085: private final ArrayList ppas = new ArrayList();
086:
087: /** Pending Alarms.
088: * alarms which need to be rung, but we haven't gotten around to yet.
089: */
090: // only modified in the run loop thread
091: private final ArrayList pas = new ArrayList();
092:
093: private static final Comparator COMPARATOR = new Comparator() {
094: public int compare(Object a, Object b) {
095: long ta = ((Alarm) a).getExpirationTime();
096: long tb = ((Alarm) b).getExpirationTime();
097: if (ta > tb)
098: return 1;
099: if (ta == tb)
100: return 0;
101: return -1;
102: }
103: };
104:
105: protected final Object sem = new Object();
106:
107: private Schedulable schedulable;
108: private ThreadService threadService = null;
109:
110: public Timer() {
111: }
112:
113: public void start(ThreadService tsvc) {
114: threadService = tsvc;
115: schedulable = tsvc.getThread(this , this , getName()); // lane?
116: // should be nothing on the queue yet, so no need to start.
117: // On the other hand, starting is harmless, since it will quit
118: // immediately if the queue is empty.
119: // For now we don't do a "schedulable.start()"
120: if (log.isDebugEnabled()) {
121: log.debug("Started");
122: }
123: }
124:
125: public void stop() {
126: if (log.isDebugEnabled()) {
127: log.debug("Stop timer");
128: }
129: synchronized (sem) {
130: Iterator it = alarms.iterator();
131: while (it.hasNext()) {
132: Alarm alarm = (Alarm) it.next();
133: if (alarm == null) {
134: continue;
135: }
136: it.remove();
137: }
138: }
139: //schedulable.cancel();
140: //schedulable = null;
141: //threadService = null;
142: }
143:
144: public long currentTimeMillis() {
145: return System.currentTimeMillis();
146: }
147:
148: public void addAlarm(Alarm alarm) {
149: if (log.isDebugEnabled()) {
150: log.debug("addAlarm(" + alarm + ")");
151: }
152: synchronized (sem) {
153: insert(alarm);
154: }
155: requestRun();
156: }
157:
158: public void cancelAlarm(Alarm alarm) {
159: if (log.isDebugEnabled()) {
160: log.debug("cancelAlarm(" + alarm + ")");
161: }
162: synchronized (sem) {
163: alarms.remove(alarm);
164: }
165: requestRun();
166: }
167:
168: protected double getRate() {
169: return 1.0;
170: }
171:
172: /**
173: * Override this to specify time before next rate change. It is
174: * always safe to underestimate.
175: */
176: protected long getMaxWait() {
177: return 10000000000L; // A long time
178: }
179:
180: protected String getName() {
181: return "Timer";
182: }
183:
184: protected void requestRun() {
185: schedulable.start();
186: }
187:
188: protected void report(Alarm alarm) {
189: if (log.isDebugEnabled()) {
190: log.debug("Ringing " + alarm);
191: }
192: }
193:
194: // must be called within sync(sem)
195: private Alarm peekAlarm() {
196: if (alarms.isEmpty())
197: return null;
198: else
199: return (Alarm) alarms.get(0);
200: }
201:
202: // must be called within sync(sem)
203: private Alarm nextAlarm() {
204: if (alarms.isEmpty())
205: return null;
206: Alarm top = (Alarm) alarms.get(0);
207: if (top != null)
208: alarms.remove(0);
209: if (alarms.isEmpty())
210: return null;
211: return (Alarm) alarms.get(0);
212: }
213:
214: // must be called only within a sync(sem)
215: private void insert(Alarm alarm) {
216: if (log.isDebugEnabled()) {
217: log.debug("insert(" + alarm + ")");
218: }
219: boolean added = false;
220: // find the right insertion point
221: ListIterator i = alarms.listIterator(0);
222: if (i.hasNext()) {
223: while (i.hasNext()) {
224: Alarm cur = (Alarm) i.next();
225: // stop if the alarm is < the current insertion point
226: if (COMPARATOR.compare(alarm, cur) < 0) {
227: i.previous(); // back up one step
228: i.add(alarm); // add before cur
229: added = true;
230: break;
231: }
232: }
233: }
234: if (!added) {
235: // no elements were greater, add at end
236: alarms.add(alarm);
237: }
238: if (log.isDetailEnabled()) {
239: synchronized (sem) {
240: log.detail("Alarms = " + alarms);
241: }
242: }
243: }
244:
245: // only called by scheduler
246: public void run() {
247: if (log.isDetailEnabled()) {
248: log.detail("run");
249: }
250: schedulable.cancelTimer(); // cancel any outstanding timed restarts
251: while (true) {
252: long time;
253: synchronized (sem) {
254: Alarm top = peekAlarm();
255:
256: if (top == null) {
257: // no pending events?
258: // new events will restart us
259: return;
260: }
261:
262: // figure out how long to wait
263: {
264: long delta = top.getExpirationTime()
265: - currentTimeMillis();
266: double rate = getRate();
267: long maxWait = getMaxWait();
268: if (rate > 0.0) {
269: delta = Math
270: .min((long) (delta / rate), maxWait);
271: } else { // Time is standing still
272: delta = maxWait; // Wait until next significant change in timer
273: }
274: if (log.isDetailEnabled()) {
275: log.detail("delta is " + delta + " for top "
276: + top);
277: }
278: if (delta > 0) {
279: if (delta < 100)
280: delta = 100; // min of .1 second wait time
281: schedulable.schedule(delta); // restart after delta ms
282: return;
283: // sem.wait(delta);
284: }
285: }
286:
287: // fire some alarms
288: top = peekAlarm();
289: time = currentTimeMillis();
290: while (top != null && time >= top.getExpirationTime()) {
291: pas.add(top);
292: top = nextAlarm();
293: }
294:
295: if (log.isDetailEnabled()) {
296: log.detail("pas is " + pas);
297: }
298:
299: } // sync(sem)
300:
301: // now ring any outstanding alarms: outside the sync
302: // just in case an alarm ringer tries setting another alarm!
303: {
304: int l = pas.size();
305: for (int i = 0; i < l; i++) {
306: Alarm top = (Alarm) pas.get(i);
307: try {
308: ring(top);
309: } catch (Throwable e) {
310: log.error("Alarm " + top
311: + " generated Exception", e);
312: // cancel error generating alarms to be certain.
313: top.cancel();
314: }
315:
316: // handle periodic alarms
317: if (top instanceof PeriodicAlarm) {
318: ppas.add(top); // consider adding it back later
319: }
320: }
321: pas.clear();
322: }
323:
324: // back in sync, reset any periodic alarms
325: synchronized (sem) {
326: // reset periodic alarms
327: int l = ppas.size();
328: for (int i = 0; i < l; i++) {
329: PeriodicAlarm ps = (PeriodicAlarm) ppas.get(i);
330: ps.reset(time); // reset it
331: if (!ps.hasExpired()) { // if it hasn't expired, add it back to the queue
332: insert(ps);
333: }
334: }
335: ppas.clear();
336: } // sync(sem)
337: } // infinite loop
338: }
339:
340: private void ring(final Alarm alarm) {
341: if (alarm.hasExpired()) {
342: // already cancelled
343: return;
344: }
345: if (!USE_SCHEDULABLE && threadService == null) {
346: // ring in our thread
347: reallyRing(alarm);
348: return;
349: }
350: // ring in pooled thread
351: Schedulable quasimodo = threadService.getThread(this ,
352: new Runnable() {
353: public void run() {
354: reallyRing(alarm);
355: }
356: }, "Alarm Ringer");
357: quasimodo.start();
358: }
359:
360: private void reallyRing(Alarm alarm) {
361: report(alarm);
362: long dt = 0L;
363: try {
364: dt = System.currentTimeMillis(); // real start time
365: alarm.expire();
366: dt = System.currentTimeMillis() - dt; // real delta time
367: //
368: if (dt > EPSILON) {
369: if (log.isWarnEnabled()) {
370: log.warn("Alarm " + alarm + " blocked for " + dt
371: + "ms while ringing");
372: }
373: }
374: } finally {
375: // see if the alarm has been evil and as has opened a transaction
376: // but neglected to close it
377: if (org.cougaar.core.blackboard.Subscriber
378: .abortTransaction()) {
379: log.error("Alarm " + alarm
380: + " failed to close it's transaction");
381: }
382: }
383: }
384: }
|