001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.core;
026:
027: import seda.sandStorm.api.*;
028: import seda.util.*;
029:
030: /**
031: * The ssTimer class provides a mechanism for registering
032: * timer events that will go off at some future time. The future time
033: * can be specified in absolute or relative terms. When the timer goes
034: * off, an element is placed on a queue. There is no way to unregister
035: * a timer. Events will be delivered guaranteed, but the time that they
036: * are delivered may slip depending on stuff like how loaded the system
037: * is and all that.
038: * <P>
039: * WARNING: you should use cancelEvent to cancel timers that you no longer
040: * need, otherwise you will waste many, many cycles on unneeded timer
041: * firings. This was the bottleneck in vSpace and the DDS before we
042: * fixed it. For example, if you set a timer to go off on a cross-CPU
043: * task to detect failure, then if the task returns successfully, cancel
044: * the timer!
045: *
046: * @author Matt Welsh and Steve Gribble
047: */
048:
049: public class ssTimer implements Runnable, ProfilableIF {
050:
051: private static final boolean DEBUG = false;
052: private ssTimerEvent head_event = null;
053: private ssTimerEvent tail_event = null;
054: private Thread thr;
055: private Object sync_o;
056: private boolean die_thread;
057: private int num_events = 0;
058:
059: public ssTimer() {
060: sync_o = new Object();
061: die_thread = false;
062: thr = new Thread(this , "SandStorm ssTimer thread");
063: thr.start();
064: }
065:
066: public static class ssTimerEvent {
067: public long time_millis;
068: public QueueElementIF obj;
069: public SinkIF queue;
070: public ssTimerEvent nextE;
071: public ssTimerEvent prevE;
072:
073: public ssTimerEvent(long m, QueueElementIF o, SinkIF q) {
074: time_millis = m;
075: obj = o;
076: queue = q;
077: nextE = null;
078: prevE = null;
079: }
080:
081: public String toString() {
082: return "ssTimerEvent<" + hashCode() + ">";
083: }
084: }
085:
086: /**
087: * Object <code>obj</code> will be placed on SinkIF <code>queue</code>
088: * no earlier than <code>millis</code> milliseconds from now.
089: *
090: * @param millis the number of milliseconds from now when the event will
091: * take place
092: * @param obj the object that will be placed on the queue
093: * @param queue the queue on which the object will be placed
094: */
095: public ssTimer.ssTimerEvent registerEvent(long millis,
096: QueueElementIF obj, SinkIF queue) {
097: long time_millis = System.currentTimeMillis() + millis;
098: ssTimerEvent newTimer = new ssTimerEvent(time_millis, obj,
099: queue);
100:
101: insertEvent(newTimer);
102:
103: return newTimer;
104: }
105:
106: /**
107: * Object <code>obj</code> will be placed on SinkIF <code>queue</code>
108: * no earlier than absolute time <code>the_date</code>.
109: *
110: * @param the_date the date when the event will take place - if this date
111: * is in the past, the event will happen right away
112: * @param obj the object that will be placed on the queue
113: * @param queue the queue on which the object will be placed
114: */
115: public ssTimer.ssTimerEvent registerEvent(java.util.Date the_date,
116: QueueElementIF obj, SinkIF queue) {
117: ssTimerEvent newTimer = new ssTimerEvent(the_date.getTime(),
118: obj, queue);
119: insertEvent(newTimer);
120:
121: return newTimer;
122: }
123:
124: /**
125: * Kills off this timer object, dropping all pending events on floor.
126: */
127: public void doneWithTimer() {
128: die_thread = true;
129:
130: synchronized (sync_o) {
131: sync_o.notify();
132: }
133: }
134:
135: /**
136: * How many events yet to fire?
137: */
138: public int size() {
139: return num_events;
140: }
141:
142: /**
143: * Return the profile size of this timer.
144: */
145: public int profileSize() {
146: return size();
147: }
148:
149: /**
150: * Cancels all events.
151: */
152: public void cancelAll() {
153: synchronized (sync_o) {
154: head_event = tail_event = null;
155: num_events = 0;
156: }
157: }
158:
159: /**
160: * Cancels the firing of this timer event.
161: *
162: * @param evt the ssTimer.ssTimerEvent to cancel. This ssTimerEvent
163: * is returned to you when you call registerEvent
164: */
165: public void cancelEvent(ssTimerEvent evt) {
166: if (evt == null)
167: return;
168:
169: try {
170: synchronized (sync_o) {
171: if (evt == tail_event) {
172:
173: // is this only list item?
174: if (tail_event == head_event) {
175: tail_event = head_event = null;
176: num_events--;
177: return;
178: }
179:
180: // not only list item, is at tail, so lop off tail
181: tail_event = tail_event.prevE;
182: tail_event.nextE = null;
183: num_events--;
184: return;
185:
186: } else if (evt == head_event) {
187:
188: // not only list item, is at head, so lop off head
189: head_event = head_event.nextE;
190: head_event.prevE = null;
191: num_events--;
192: return;
193:
194: } else {
195:
196: // make sure event didn't fire already
197: if ((evt.prevE != null) && (evt.nextE != null)) {
198: // in middle somewhere
199: evt.prevE.nextE = evt.nextE;
200: evt.nextE.prevE = evt.prevE;
201: num_events--;
202: return;
203: }
204: }
205: }
206: } finally {
207: evt.nextE = null;
208: evt.prevE = null;
209: }
210: }
211:
212: // takes the event, does insertion-sort into ssTimerEvent linked list
213: private void insertEvent(ssTimerEvent newTimer) {
214: boolean do_notify = false;
215:
216: synchronized (sync_o) {
217: if (head_event == null) {
218: // list empty
219: if (DEBUG)
220: System.err
221: .println("ssTimer: Inserting first event, num pending "
222: + num_events + " event " + newTimer);
223: tail_event = newTimer;
224: head_event = newTimer;
225: num_events++;
226: do_notify = true;
227: } else if (head_event.time_millis > newTimer.time_millis) {
228: // insert head
229: if (DEBUG)
230: System.err
231: .println("ssTimer: Inserting event at head, num pending "
232: + num_events + " event " + newTimer);
233: newTimer.nextE = head_event;
234: head_event.prevE = newTimer;
235: head_event = newTimer;
236: num_events++;
237: do_notify = true;
238: } else if (tail_event.time_millis <= newTimer.time_millis) {
239: // insert tail
240: if (DEBUG)
241: System.err
242: .println("ssTimer: Inserting event at tail, num pending "
243: + num_events + " event " + newTimer);
244: newTimer.prevE = tail_event;
245: tail_event.nextE = newTimer;
246: tail_event = newTimer;
247: num_events++;
248: // if not insert at head, no notify! :)
249: } else {
250: // insert somewhere in middle :(
251: if (DEBUG)
252: System.err
253: .println("ssTimer: Inserting new event in middle, num pending "
254: + num_events + " event " + newTimer);
255: ssTimerEvent prevE = tail_event.prevE;
256: ssTimerEvent curE = tail_event;
257: boolean gotit = false;
258: while ((prevE != null) && (gotit == false)) {
259: if (prevE.time_millis <= newTimer.time_millis) {
260: prevE.nextE = newTimer;
261: curE.prevE = newTimer;
262: newTimer.nextE = curE;
263: newTimer.prevE = prevE;
264: // if not insert at head, no notify! :)
265: gotit = true;
266: }
267: curE = prevE;
268: prevE = prevE.prevE;
269: }
270: num_events++;
271: }
272:
273: if (do_notify) {
274: sync_o.notify();
275: }
276: }
277: }
278:
279: private void process_head() {
280:
281: ssTimerEvent fire = null;
282: long wait_time = -1;
283:
284: long curTime = System.currentTimeMillis();
285:
286: if (head_event.time_millis <= curTime) {
287: // fire off event
288: fire = head_event;
289: if (DEBUG)
290: System.err.println("Firing " + fire + " -> "
291: + head_event.nextE + " " + (num_events - 1)
292: + " pending");
293: head_event = head_event.nextE;
294: if (head_event == null) {
295: // was only event
296: tail_event = null;
297: } else {
298: // reset back pointer
299: head_event.prevE = null;
300: }
301:
302: if ((head_event == null) && (num_events != 1)) {
303: System.err
304: .println("ssTimer: Warning: No more events to process, but still have "
305: + (num_events - 1)
306: + " pending. This is a bug; please contact <mdw@cs.berkeley.edu>");
307: }
308:
309: fire.nextE = null;
310: fire.prevE = null;
311: num_events--;
312:
313: } else {
314: // sleep till head
315: if (DEBUG)
316: System.err.println("ssTimer: head is "
317: + (head_event.time_millis - curTime)
318: + " ms in the future");
319: wait_time = head_event.time_millis - curTime;
320: if (wait_time != -1) {
321: try {
322: sync_o.wait(wait_time);
323: } catch (InterruptedException ie) {
324: // Ignore
325: }
326: }
327: }
328:
329: if (fire != null) {
330: fire.queue.enqueue_lossy(fire.obj);
331: }
332: }
333:
334: public void run() {
335: synchronized (sync_o) {
336: while (die_thread == false) {
337: try {
338: if (head_event != null) {
339: process_head();
340: } else {
341: if (die_thread == true)
342: return;
343:
344: try {
345: sync_o.wait(500);
346: } catch (InterruptedException ie) {
347: }
348: }
349: } catch (Throwable t) {
350: t.printStackTrace();
351: }
352: }
353: }
354: }
355:
356: private static class GQEString implements QueueElementIF {
357: private String ns = null;
358: private long inj;
359:
360: public GQEString(String f) {
361: ns = f;
362: inj = System.currentTimeMillis();
363: }
364:
365: public String toString() {
366: return ns + " elapsed="
367: + (System.currentTimeMillis() - inj);
368: }
369: }
370:
371: public static void main(String args[]) {
372: FiniteQueue q = new FiniteQueue();
373: ssTimer te = new ssTimer();
374: ssTimer.ssTimerEvent t1, t10, t20, t30, t40, t50, t250, t500, t2500, t1500, t3500, t15000, t8000;
375:
376: System.out.println("adding 1 millisecond event");
377: t1 = te.registerEvent(1, new GQEString("1"), q);
378: System.out.println("adding 10 millisecond event");
379: t10 = te.registerEvent(10, new GQEString("10"), q);
380: System.out.println("adding 20 millisecond event");
381: t20 = te.registerEvent(20, new GQEString("20"), q);
382: System.out.println("adding 30 millisecond event");
383: t30 = te.registerEvent(30, new GQEString("30"), q);
384: System.out.println("adding 40 millisecond event");
385: t40 = te.registerEvent(40, new GQEString("40"), q);
386: System.out.println("adding 50 millisecond event");
387: t50 = te.registerEvent(50, new GQEString("50"), q);
388: System.out.println("adding 250 millisecond event");
389: t250 = te.registerEvent(250, new GQEString("250"), q);
390: System.out.println("adding 500 millisecond event");
391: t500 = te.registerEvent(500, new GQEString("500"), q);
392: System.out.println("adding 2500 millisecond event");
393: t2500 = te.registerEvent(2500, new GQEString("2500"), q);
394: System.out.println("adding 1500 millisecond event");
395: t1500 = te.registerEvent(1500, new GQEString("1500"), q);
396: System.out.println("adding 3500 millisecond event");
397: t3500 = te.registerEvent(3500, new GQEString("3500"), q);
398: System.out.println("adding 15000 millisecond event");
399: t15000 = te.registerEvent(15000, new GQEString("15000"), q);
400: System.out.println("adding 8000 millisecond event");
401: t8000 = te.registerEvent(8000, new GQEString("8000"), q);
402:
403: int num_got = 0;
404: while (num_got < 13) {
405: QueueElementIF nextEl[] = q.dequeue_all();
406:
407: if (nextEl != null) {
408: num_got += nextEl.length;
409: System.out.println("got " + nextEl.length + " event"
410: + (nextEl.length > 1 ? "s" : ""));
411: for (int i = 0; i < nextEl.length; i++)
412: System.out.println(" " + i + ": " + nextEl[i]);
413: System.out.println("total num got so far is: "
414: + num_got);
415: System.out.println("num remain is: " + te.size());
416: if (num_got == 3)
417: te.cancelEvent(t2500);
418: } else {
419: try {
420: Thread.currentThread().sleep(5);
421: } catch (InterruptedException ie) {
422: }
423: }
424: }
425:
426: te.doneWithTimer();
427: }
428: }
|