001: /*
002: * Scheduler.java February 2001
003: *
004: * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net>
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013: * GNU Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General
016: * Public License along with this library; if not, write to the
017: * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
018: * Boston, MA 02111-1307 USA
019: */
020:
021: package simple.util.schedule;
022:
023: /**
024: * The <code>Scheduler</code> is used for scheduling arbitrary objects.
025: * When arbitrary objects are enqueued into a <code>Scheduler</code>
026: * the <code>Scheduler</code> object will guarantee that that object
027: * will not be dequeued until its timeout has expired.
028: * <p>
029: * This guarantees that an object is returned when the dequeue method
030: * is invoked. If there are no objects within the <code>Scheduler</code>
031: * then the thread that invoked this dequeue method will wait until there
032: * is an object to dequeue. If however there is an object within the
033: * <code>Scheduler</code> the dequeue method will block the thread
034: * until such time as the objects timeout has expired or if another
035: * object has been enqueued concurrently that will allow the dequeue
036: * method to return earlier then it returns the object which has the
037: * least time to wait.
038: * <p>
039: * The waiting of an object is based on the time when it is allowed to
040: * be released. For instance if an object with a given timeout of 1000
041: * ms is enqueued then the time when that object is permitted to be
042: * released is the sum of the <code>System.currentTimeMillis</code> and
043: * the timeout which is 1000, relative to the <code>currentTime</code>,
044: * that is from the time that it is scheduled.
045: * <p>
046: * This ensures regardless of the timeout peroid an object will always
047: * be released at some time because its priority is based on the time
048: * it entered the <code>SchedulerQueue</code>.
049: *
050: * @author Niall Gallagher
051: */
052: public class Scheduler {
053:
054: /**
055: * The maximum length of time an object can be enqueued.
056: */
057: protected static final long DEFAULT_MAX = 120000L;
058:
059: /**
060: * This is the <code>PriorityQueue</code> for the objects.
061: */
062: protected SchedulerQueue queue;
063:
064: /**
065: * List of current dequeuers for this <code>Scheduler</code>.
066: */
067: protected Registry registry;
068:
069: /**
070: * The default maximum time a object can wait.
071: */
072: protected long max;
073:
074: /**
075: * The time this object was created.
076: */
077: private long start;
078:
079: /**
080: * This will create a default <code>Scheduler</code>. The default
081: * <code>Scheduler</code> will allow timeouts of up to 60000
082: * miliseconds. This means that an object that has been scheduled
083: * for release at a specified time will be guaranteed to be released
084: * at least 1 minute after it was scheduled and if the timeout is
085: * less than 1 minute then it will be released after that timeout
086: * has expired.
087: */
088: public Scheduler() {
089: this (DEFAULT_MAX);
090: }
091:
092: /**
093: * This creates a <code>Scheduler</code> object with the maximum
094: * timeout specified. This will ensure that no object will be
095: * blocked for longer than the specified timeout. If an object is
096: * enqueued with a timeout greater than the specified timeout then
097: * the timeout is capped to the max.
098: *
099: * @param max the maximum timeout for this <code>Scheduler</code>
100: */
101: public Scheduler(long max) {
102: this .queue = new SchedulerQueue();
103: this .registry = new Registry();
104: this .start = currentTime();
105: this .max = max;
106: }
107:
108: /**
109: * This schedules objects so that they will not be released
110: * until the specified timeout has expired. If this timeout
111: * is greater than the maximum timeout then it is capped and
112: * released once the maximum timeout has expired instead.
113: *
114: * @param object the object to be scheduled for a timeout
115: * @param wait time it is to wait within the <code>Scheduler</code>
116: */
117: public void enqueue(Object object, long wait) {
118: long timeout = Math.min(wait, max) + currentTime();
119: Entry entry = new Entry();
120: entry.object = object;
121: entry.timeout = timeout;
122: enqueue(entry);
123: interrupt(entry); /* N.B. */
124: }
125:
126: /**
127: * This method is important for enqueuing items as it tells the
128: * <code>Registry</code> what the timeout of an incomming item is.
129: * If the item has a timeout that is less than a dequeuer then the
130: * thread that is dequeuing the item with the larger timeout is
131: * interrupted.
132: *
133: * @param entry this is the new entry that has been enqueued
134: */
135: protected void interrupt(Entry entry) {
136: registry.interrupt(entry.timeout);
137: }
138:
139: /**
140: * This adds an entry object into the <code>SchedulerQueue</code>
141: * this will set a priority based on the time this object is to be
142: * released at, that is the sooner that the entry is to be released
143: * the higher its priority. The actual priority is the time of
144: * release times -1.
145: *
146: * @param entry the entry object that is being queued
147: */
148: protected void enqueue(Entry entry) {
149: long wait = entry.timeout;
150: long minus = -1L * wait;
151: queue.enqueue(entry, minus);
152: }
153:
154: /**
155: * This is used to dequeue the objects from the
156: * <code>Scheduler</code>. This will return an object only when it
157: * has one whose timeout has expired. If there are no objects in the
158: * <code>Scheduler</code> then it will block the thread until one
159: * becomes available. This is guaranteed to return an object once an
160: * objects timeout has expired. If a thread is blocked trying to
161: * retrive an object whose timeout has not yet expired and another
162: * object is entered concurrently which does not have to wait such a
163: * long time then the thread will become unblocked and return the
164: * object with the least timeout. This method cannot be synchronized
165: * because the thread will wait when the <code>SchedulerQueue</code>
166: * <code>isEmpty</code> and thus that thread will cause a deadlock
167: * situation, because it will have the lock.
168: *
169: * @throws InterruptedException thrown if the thread is interrupted
170: *
171: * @return the returns the next object which timeout has expired
172: */
173: public Object dequeue() throws InterruptedException {
174: while (true) {
175: Object top = queue.dequeue();
176: Entry entry = (Entry) top;
177: try {
178: sleep(entry.timeout);
179: return entry.object;
180: } catch (InterruptedException e) {
181: enqueue(entry);
182: }
183: }
184: }
185:
186: /**
187: * This will unregister the current thread from the
188: * <code>Registry</code> object. This means that this thread will
189: * no longer recive interrupts from the <code>Registry</code>.
190: */
191: protected void unregister() {
192: registry.remove();
193: }
194:
195: /**
196: * This will register with the <code>Registry</code> object. This
197: * basically means that this thread wishes to recieve interrupts
198: * if there is a thread that calls the interrupt method of the
199: * <code>Registry</code> with an wake timeout less that this
200: * threads wake timeout.
201: *
202: * @param awaken the time this thread will stop sleeping
203: */
204: protected void register(long awaken) {
205: registry.sleeping(awaken);
206: }
207:
208: /**
209: * This will put the curent thread to sleep. This thread
210: * may be woken by the <code>Registry</code> if there is a
211: * <code>Registry.interrupt</code> method invoked that has
212: * a sleep time less than the registered thread.
213: *
214: * @exception InterruptedException Thread.sleep throws this
215: *
216: * @param awaken the time that this thread will stop sleeping
217: */
218: protected void sleep(long awaken) throws InterruptedException {
219: try {
220: long time = currentTime();
221: long sleep = awaken - time;
222: if (sleep <= 0)
223: return;
224: register(awaken);
225: Thread.sleep(sleep);
226: } finally {
227: unregister();
228: }
229: }
230:
231: /**
232: * This returns the <code>currentTime</code> in milliseconds from
233: * the creation of this. The time is made smaller by subtracting
234: * the time this object was created at. This time will return an
235: * increasing time from 0 to the Long.MAX_VALUE.
236: *
237: * @return the <code>currentTime</code> minus the time that this
238: * was created.
239: */
240: protected long currentTime() {
241: return System.currentTimeMillis() - start;
242: }
243:
244: /**
245: * This is used to keep objects with there
246: * time of release. The time when this object
247: * can be dequeued is retrived by timeout.
248: */
249: public class Entry {
250: public Object object;
251: public long timeout;
252: }
253:
254: }
|