001: package com.quadcap.util.threads;
002:
003: /* Copyright 1997 - 2003 Quadcap Software. All rights reserved.
004: *
005: * This software is distributed under the Quadcap Free Software License.
006: * This software may be used or modified for any purpose, personal or
007: * commercial. Open Source redistributions are permitted. Commercial
008: * redistribution of larger works derived from, or works which bundle
009: * this software requires a "Commercial Redistribution License"; see
010: * http://www.quadcap.com/purchase.
011: *
012: * Redistributions qualify as "Open Source" under one of the following terms:
013: *
014: * Redistributions are made at no charge beyond the reasonable cost of
015: * materials and delivery.
016: *
017: * Redistributions are accompanied by a copy of the Source Code or by an
018: * irrevocable offer to provide a copy of the Source Code for up to three
019: * years at the cost of materials and delivery. Such redistributions
020: * must allow further use, modification, and redistribution of the Source
021: * Code under substantially the same terms as this license.
022: *
023: * Redistributions of source code must retain the copyright notices as they
024: * appear in each source code file, these license terms, and the
025: * disclaimer/limitation of liability set forth as paragraph 6 below.
026: *
027: * Redistributions in binary form must reproduce this Copyright Notice,
028: * these license terms, and the disclaimer/limitation of liability set
029: * forth as paragraph 6 below, in the documentation and/or other materials
030: * provided with the distribution.
031: *
032: * The Software is provided on an "AS IS" basis. No warranty is
033: * provided that the Software is free of defects, or fit for a
034: * particular purpose.
035: *
036: * Limitation of Liability. Quadcap Software shall not be liable
037: * for any damages suffered by the Licensee or any third party resulting
038: * from use of the Software.
039: */
040:
041: import java.io.ByteArrayOutputStream;
042: import java.io.PrintWriter;
043:
044: import java.util.Date;
045: import java.util.Enumeration;
046: import java.util.Hashtable;
047: import java.util.Vector;
048:
049: import com.quadcap.util.DList;
050: import com.quadcap.util.DListItem;
051: import com.quadcap.util.Debug;
052: import com.quadcap.util.ListException;
053:
054: /**
055: * This class acts as a scheduler for periodic tasks, which are named and
056: * execute repeatedly with a specified interval. Each task is a
057: * <a href="com.quadcap.util.threads.Command.html#_top_">Command</a>
058: * object, as used by e.g.,
059: * <a href="com.quadcap.util.threads.StreamWorker.html#_top_">
060: * StreamWorker</a>.<p>
061: *
062: * Two flavors of this class are expressed here -- the first class
063: * takes a <A href="com.quadcap.util.threads.Stream.html#_top_">Stream</a>
064: * parameter. When each task becomes ready, the Command object is
065: * written to the Stream, and it is presumed that a StreamWorker task
066: * will be sitting at the other end of the Stream, executing the
067: * Commands that come down the pipe.<p>
068: *
069: * The other flavor of this class takes a context argument, which can
070: * be any Java object (i.e., this class doesn't ever look at the context.)
071: * When each task becomes ready, it is simply executed within the
072: * PeriodicScheduler thread.
073: *
074: * @author Stan Bailes
075: */
076: public class PeriodicScheduler extends Thread implements DebugMonitor {
077: DList active = new DList();
078: Stream stream = null;
079: Object context = null;
080: boolean terminate = false;
081: Hashtable schedule = new Hashtable();
082:
083: /**
084: * Construct a new PeriodicScheduler which will write Commands to the
085: * specified Stream.
086: *
087: * @param stream the Stream used to write Commands when they become
088: * ready.
089: */
090: public PeriodicScheduler(ThreadGroup group, String name, Object obj) {
091: super (group, name);
092: if (obj instanceof Stream) {
093: this .stream = (Stream) obj;
094: } else {
095: this .context = obj;
096: }
097: setDaemon(true);
098: }
099:
100: /**
101: * Add a new task, with the specified name, action, and interval.
102: * The task will <b>NOT</b> be executed 'now'; rather its first
103: * execution will be scheduled at time 'now + interval'.
104: *
105: * @param name the name of this task. Any existing task with the
106: * same name is replaced by this one.
107: * @param c the Command action associated with this task.
108: * @param interval the interval (in ms) between invocations of this
109: * task.
110: */
111: public void add(String name, Command c, long interval) {
112: //Debug.println(3, "add " + name);
113: synchronized (active) {
114: ScheduleItem s = (ScheduleItem) schedule.get(name);
115: if (s == null) {
116: s = new ScheduleItem(name);
117: schedule.put(name, s);
118: } else {
119: remove(s);
120: }
121: s.command = c;
122: s.interval = interval;
123: schedule(s);
124: }
125: }
126:
127: /**
128: * Add a new task, with the specified name, action, and interval.
129: * The task will <b>NOT</b> be executed 'now'; rather its first
130: * execution will be scheduled at time 'now + interval'.
131: *
132: * @param name the name of this task. Any existing task with the
133: * same name is replaced by this one.
134: * @param r the Runnable action associated with this task.
135: * @param interval the interval (in ms) between invocations of this
136: * task.
137: */
138: public void add(String name, Runnable r, long interval) {
139: //Debug.println(3, "add " + name);
140: synchronized (active) {
141: ScheduleItem s = (ScheduleItem) schedule.get(name);
142: if (s == null) {
143: s = new ScheduleItem(name);
144: schedule.put(name, s);
145: } else {
146: remove(s);
147: }
148: s.runnable = r;
149: s.interval = interval;
150: schedule(s);
151: }
152: }
153:
154: void remove(ScheduleItem item) {
155: //Debug.println(3, "remove " + item.name);
156: synchronized (active) {
157: DListItem head = null;
158: try {
159: head = active.head();
160: } catch (ListException e) {
161: }
162: DListItem d = head;
163: if (d != null)
164: do {
165: ScheduleItem s = (ScheduleItem) d.obj;
166: if (item == s) {
167: active.unlink(d);
168: break;
169: }
170: d = d.next;
171: } while (d != head);
172: active.notify();
173: }
174: }
175:
176: /**
177: * Add the specified schedule item to the queue.
178: *
179: * @param item the item to schedule
180: */
181: void schedule(ScheduleItem item) {
182: //Debug.println(6, "schedule " + item.name);
183: item.checkDate(new Date());
184: synchronized (active) {
185: DListItem head = null;
186: try {
187: head = active.head();
188: } catch (ListException e) {
189: }
190: DListItem d = head;
191: if (d == null
192: || item.date.before(((ScheduleItem) d.obj).date)) {
193: active.addFront(item);
194: } else {
195: while (d.next != head) {
196: d = d.next;
197: ScheduleItem s = (ScheduleItem) d.obj;
198: if (item.date.before(s.date))
199: break;
200: }
201: active.addBefore(d, item);
202: }
203: active.notify();
204: }
205: }
206:
207: /**
208: * Thread method, runs the main scheduler event loop: Get the next
209: * task to execute (the active list is sorted by execution time) and
210: * wait that long. If another thread puts a new task in the queue,
211: * we'll get interrupted, but than's ok. Then, look at the head of the
212: * active list again and, if the task is ready, execute it.
213: */
214: public void run() {
215: //Debug.println(2, "starting: " + getName());
216: while (!terminate) {
217: ScheduleItem s = null;
218: synchronized (active) {
219: DListItem head = null;
220: try {
221: head = active.head();
222: } catch (ListException e) {
223: }
224:
225: try {
226: if (head == null) {
227: active.wait();
228: } else {
229: long ms = msUntil(((ScheduleItem) head.obj).date);
230: //Debug.println(6, "Next scheduled event in " + ms + " ms");
231: if (ms > 0)
232: active.wait(ms);
233: }
234: } catch (InterruptedException e) {
235: }
236:
237: try {
238: head = active.head();
239: s = (ScheduleItem) head.obj;
240: if (s.ready()) {
241: active.popFront();
242: } else {
243: s = null;
244: }
245: } catch (ListException e) {
246: s = null;
247: }
248: }
249:
250: if (s != null) {
251: try {
252: if (stream != null) {
253: //Debug.println(6, "Dispatching: " + s.name);
254: if (s.runnable != null) {
255: stream.write(s.runnable);
256: } else {
257: stream.write(s.command);
258: }
259: } else if (context != null) {
260: //Debug.println(6, "Running: " + s.name);
261: if (s.runnable != null) {
262: if (!s.isRunning()) {
263: new Thread(s.runnable).start();
264: }
265: } else {
266: s.command.execute(context);
267: }
268: //Debug.println(6, "Done: " + s.name);
269: }
270: } catch (Exception e) {
271: Debug.print(e);
272: }
273: schedule(s);
274: }
275: }
276: //Debug.println(2, "terminating " + getName());
277: }
278:
279: /**
280: * Handle a debug monitor command directed at this instance.
281: */
282: public int handleInteractiveCommand(String cmd, Vector args) {
283: if (cmd.equals("list")) {
284: Enumeration enum = schedule.elements();
285: while (enum.hasMoreElements()) {
286: ScheduleItem s = (ScheduleItem)enum.nextElement();
287: System.out.println("agent: " + s.name + ", interval = " +
288: s.interval + " ms, next scheduled: " +
289: s.date);
290: }
291: } else if (cmd.equals("queue")) {
292: synchronized(active) {
293: DListItem head = null;
294: try {
295: head = active.head();
296: } catch (ListException e) {
297: }
298: DListItem d = head;
299: if (d != null) {
300: do {
301: ScheduleItem s = (ScheduleItem)d.obj;
302: System.out.println("agent: " + s.name +
303: ", interval = " +
304: s.interval +
305: " ms, next scheduled: " +
306: s.date);
307: d = d.next;
308: } while (d != head);
309: }
310: }
311: } else {
312: System.out.println("usage: ");
313: System.out.println(" list");
314: return -1;
315: }
316: return 0;
317: }
318:
319: public String toString() {
320: ByteArrayOutputStream bos = new ByteArrayOutputStream();
321: PrintWriter pos = new PrintWriter(bos);
322: synchronized (active) {
323: DListItem head = null;
324: try {
325: head = active.head();
326: } catch (ListException e) {
327: }
328: DListItem d = head;
329: if (d != null) {
330: do {
331: ScheduleItem s = (ScheduleItem) d.obj;
332: pos.println("agent: " + s.name);
333: pos.println(" interval: " + s.interval + " ms");
334: pos.println(" next: " + s.date);
335: pos.println(" running: " + s.isRunning());
336: d = d.next;
337: } while (d != head);
338: }
339: }
340: pos.flush();
341: return bos.toString();
342: }
343:
344: /**
345: * How many ms between now and then (d)?
346: *
347: * @param d then
348: * @return then - now
349: */
350: long msUntil(Date d) {
351: long now = new Date().getTime();
352: long then = d.getTime();
353: return then - now;
354: }
355:
356: /**
357: * Terminate this scheduler.
358: */
359: public void terminate() {
360: terminate = true;
361: synchronized (active) {
362: active.notify();
363: }
364: }
365:
366: }
367:
368: /**
369: * This class is private to PeriodicScheduler: it represents a single
370: * scheduled item, and gets placed in the queue and the hashtable.
371: */
372: class ScheduleItem implements Runnable {
373: Command command = null;
374: Runnable runnable = null;
375: String name = null;
376: Date date = new Date();
377: long interval = 5000;
378: boolean running = false;
379:
380: ScheduleItem(String name) {
381: this .name = name;
382: }
383:
384: void checkDate(Date now) {
385: if (interval < 0)
386: throw new RuntimeException("negative interval");
387: //date.setTime(now.getTime() + interval);
388: date.setTime(date.getTime() + interval);
389: if (date.before(now))
390: date.setTime(now.getTime() + interval);
391: }
392:
393: boolean ready() {
394: return date.before(new Date());
395: }
396:
397: boolean isRunning() {
398: return running;
399: }
400:
401: public void run() {
402: running = true;
403: try {
404: runnable.run();
405: } catch (Throwable t) {
406: Debug.print(t);
407: } finally {
408: running = false;
409: }
410: }
411: }
|