001: // $Id: Scheduler.java,v 1.13.2.1 2007/03/08 10:23:05 belaban Exp $
002:
003: package org.jgroups.util;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.Global;
008:
009: /**
010: * Implementation of a priority scheduler. The scheduler maintains a queue to the end of which
011: * all tasks are added. It continually looks at the first queue element, assigns a thread to
012: * it, runs the thread and waits for completion. When a new <em>priority task</em> is added,
013: * it will be added to the head of the queue and the scheduler will be interrupted. In this
014: * case, the currently handled task is suspended, and the one at the head of the queue
015: * handled. This is recursive: a priority task can always be interrupted by another priority
016: * task. Resursion ends when no more priority tasks are added, or when the thread pool is
017: * exhausted.
018: *
019: * @author Bela Ban
020: */
021: public class Scheduler implements Runnable {
022: final Queue queue = new Queue();
023: Thread sched_thread = null;
024: Task current_task = null;
025: ThreadPool pool = null;
026: SchedulerListener listener = null;
027:
028: protected static final Log log = LogFactory.getLog(Scheduler.class);
029:
030: /** Process items on the queue concurrently. The default is to wait until the processing of an item
031: * has completed before fetching the next item from the queue. Note that setting this to true
032: * may destroy the properties of a protocol stack, e.g total or causal order may not be
033: * guaranteed. Set this to true only if you know what you're doing ! */
034: boolean concurrent_processing = false;
035:
036: /** max number of threads, will only be allocated when needed */
037: int NUM_THREADS = 128;
038:
039: static final int WAIT_FOR_THREAD_AVAILABILITY = 3000;
040: static final int THREAD_JOIN_TIMEOUT = 1000;
041:
042: public Scheduler() {
043: String tmp = Util.getProperty(
044: new String[] { Global.SCHEDULER_MAX_THREADS }, null,
045: null, false, "128");
046: this .NUM_THREADS = Integer.parseInt(tmp);
047: }
048:
049: public Scheduler(int num_threads) {
050: this .NUM_THREADS = num_threads;
051: }
052:
053: public void setListener(SchedulerListener l) {
054: listener = l;
055: }
056:
057: public boolean getConcurrentProcessing() {
058: return concurrent_processing;
059: }
060:
061: public void setConcurrentProcessing(boolean process_concurrently) {
062: this .concurrent_processing = process_concurrently;
063: }
064:
065: public void run() {
066: while (sched_thread != null) {
067: if (queue.closed())
068: break;
069: try {
070: current_task = (Task) queue.peek(); // get the first task in the queue (blocks until available)
071: if (current_task == null) { // @remove
072: if (log.isWarnEnabled())
073: log.warn("current task is null, queue.size()="
074: + queue.size() + ", queue.closed()="
075: + queue.closed() + ", continuing");
076: continue;
077: }
078:
079: if (current_task.suspended) {
080: current_task.suspended = false;
081: current_task.thread.resume();
082: if (listener != null)
083: listener.resumed(current_task.thread,
084: current_task.target);
085: } else {
086: if (current_task.thread == null) {
087: current_task.thread = pool.getThread();
088: if (current_task.thread == null) { // thread pool exhausted
089: if (log.isWarnEnabled())
090: log
091: .warn("thread pool exhausted, waiting for "
092: + WAIT_FOR_THREAD_AVAILABILITY
093: + "ms before retrying");
094: Util.sleep(WAIT_FOR_THREAD_AVAILABILITY);
095: continue;
096: }
097: }
098:
099: // if we get here, current_task.thread and current_task.target are guaranteed to be non-null
100: if (listener != null)
101: listener.started(current_task.thread,
102: current_task.target);
103: if (current_task.thread
104: .assignTask(current_task.target) == false)
105: continue;
106: }
107:
108: if (sched_thread.isInterrupted()) { // will continue at "catch(InterruptedException)" below
109: // sched_thread.interrupt();
110:
111: // changed on suggestion from Victor Cardoso: sched_thread.interrupt() does *not* throw an
112: // InterruptedException, so we don't land in the catch clause, but rather execute the code below
113: // (which we don't want) - bela April 15 2004
114:
115: throw new InterruptedException();
116: }
117:
118: if (concurrent_processing == false) { // this is the default: process serially
119: synchronized (current_task.thread) {
120: while (!current_task.thread.done()
121: && !current_task.thread.suspended)
122: current_task.thread.wait();
123: }
124: if (listener != null)
125: listener.stopped(current_task.thread,
126: current_task.target);
127: }
128: queue.removeElement(current_task);
129: } catch (InterruptedException interrupted) {
130: if (sched_thread == null || queue.closed())
131: break;
132: if (current_task.thread != null) {
133: current_task.thread.suspend();
134: if (listener != null)
135: listener.suspended(current_task.thread,
136: current_task.target);
137: current_task.suspended = true;
138: }
139: Thread.interrupted(); // clears the interrupt-flag
140: } catch (QueueClosedException closed_ex) {
141: return;
142: } catch (Throwable ex) {
143: if (log.isErrorEnabled())
144: log.error("exception=" + Util.print(ex));
145: }
146: }
147: if (log.isTraceEnabled())
148: log.trace("scheduler thread terminated");
149: }
150:
151: public void addPrio(Runnable task) {
152: Task new_task = new Task(task);
153: boolean do_interrupt = false;
154:
155: try {
156: synchronized (queue) { // sync against add()
157: if (queue.size() == 0)
158: queue.add(new_task);
159: else {
160: queue.addAtHead(new_task);
161: do_interrupt = true;
162: }
163: }
164: if (do_interrupt) // moved out of 'synchronized(queue)' to minimize lock contention
165: sched_thread.interrupt();
166: } catch (Throwable e) {
167: if (log.isErrorEnabled())
168: log.error("exception=" + e);
169: }
170: }
171:
172: public void add(Runnable task) {
173: Task new_task = new Task(task);
174:
175: try {
176: synchronized (queue) { // sync against addPrio()
177: queue.add(new_task);
178: }
179: } catch (Exception e) {
180: if (log.isErrorEnabled())
181: log.error("exception=" + e);
182: }
183: }
184:
185: public void start() {
186: if (queue.closed())
187: queue.reset();
188: if (sched_thread == null) {
189: pool = new ThreadPool(NUM_THREADS);
190: sched_thread = new Thread(this , "Scheduler main thread");
191: sched_thread.setDaemon(true);
192: sched_thread.start();
193: }
194: }
195:
196: /**
197: * Stop the scheduler thread. The thread may be waiting for its next task (queue.peek()) or it may be waiting on
198: * the currently executing thread. In the first case, closing the queue will throw a QueueClosed exception which
199: * terminates the scheduler thread. In the second case, after closing the queue, we interrupt the scheduler thread,
200: * which then checks whether the queue is closed. If this is the case, the scheduler thread terminates.
201: */
202: public void stop() {
203: Thread tmp = null;
204:
205: // 1. Close the queue
206: queue.close(false); // will stop thread at next peek();
207:
208: // 2. Interrupt the scheduler thread
209: if (sched_thread != null && sched_thread.isAlive()) {
210: tmp = sched_thread;
211: sched_thread = null;
212: tmp.interrupt();
213: try {
214: tmp.join(THREAD_JOIN_TIMEOUT);
215: } catch (Exception ex) {
216: }
217:
218: if (tmp.isAlive())
219: if (log.isErrorEnabled())
220: log
221: .error("scheduler thread is still not dead !!!");
222: }
223: sched_thread = null;
224:
225: // 3. Delete the thread pool
226: if (pool != null) {
227: pool.destroy();
228: pool = null;
229: }
230: }
231:
232: public static class Task {
233: ReusableThread thread = null;
234: Runnable target = null;
235: boolean suspended = false;
236:
237: Task(Runnable target) {
238: this .target = target;
239: }
240:
241: public String toString() {
242: return "[thread=" + thread + ", target=" + target
243: + ", suspended=" + suspended + ']';
244: }
245: }
246:
247: }
|