001: // $Id: ReusableThread.java,v 1.7.16.1 2007/03/08 10:22:05 belaban Exp $
002:
003: package org.jgroups.util;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007:
008: /**
009: * Reusable thread class. Instead of creating a new thread per task, this instance can be reused
010: * to run different tasks in turn. This is done by looping and assigning the Runnable task objects
011: * whose <code>run</code> method is then called.<br>
012: * Tasks are Runnable objects and should be prepared to terminate when they receive an
013: * InterruptedException. This is thrown by the stop() method.<br>
014: * <p/>
015: * The following situations have to be tested:
016: * <ol>
017: * <li>ReusableThread is started. Then, brefore assigning a task, it is stopped again
018: * <li>ReusableThread is started, assigned a long running task. Then, before task is done,
019: * stop() is called
020: * <li>ReusableThread is started, assigned a task. Then waitUntilDone() is called, then stop()
021: * <li>ReusableThread is started, assigned a number of tasks (waitUntilDone() called between tasks),
022: * then stopped
023: * <li>ReusableThread is started, assigned a task
024: * </ol>
025: *
026: * @author Bela Ban
027: */
028: public class ReusableThread implements Runnable {
029: volatile Thread thread = null; // thread that works on the task
030: Runnable task = null; // task assigned to thread
031: private ThreadLocalListener tl_listener = null;
032: String thread_name = "ReusableThread";
033: volatile boolean suspended = false;
034: protected static final Log log = LogFactory
035: .getLog(ReusableThread.class);
036: static final long TASK_JOIN_TIME = 3000; // wait 3 secs for an interrupted thread to terminate
037:
038: public ReusableThread() {
039: }
040:
041: public ReusableThread(String thread_name) {
042: this .thread_name = thread_name;
043: }
044:
045: public boolean done() {
046: return task == null;
047: }
048:
049: public boolean available() {
050: return done();
051: }
052:
053: public boolean isAlive() {
054: synchronized (this ) {
055: return thread != null && thread.isAlive();
056: }
057: }
058:
059: /**
060: * Will always be called from synchronized method, no need to do our own synchronization
061: */
062: public void start() {
063: if (thread == null || (thread != null && !thread.isAlive())) {
064: thread = new Thread(this , thread_name);
065: thread.setDaemon(true);
066: thread.start();
067: }
068: }
069:
070: /**
071: * Stops the thread by setting thread=null and interrupting it. The run() method catches the
072: * InterruptedException and checks whether thread==null. If this is the case, it will terminate
073: */
074: public void stop() {
075: Thread tmp = null;
076:
077: if (log.isTraceEnabled())
078: log.trace("entering THIS");
079: synchronized (this ) {
080: if (log.isTraceEnabled())
081: log.trace("entered THIS (thread=" + printObj(thread)
082: + ", task=" + printObj(task) + ", suspended="
083: + suspended + ')');
084: if (thread != null && thread.isAlive()) {
085: tmp = thread;
086: thread = null; // signals the thread to stop
087: task = null;
088: if (log.isTraceEnabled())
089: log.trace("notifying thread");
090: notifyAll();
091: if (log.isTraceEnabled())
092: log.trace("notifying thread completed");
093: }
094: thread = null;
095: task = null;
096: }
097:
098: if (tmp != null && tmp.isAlive()) {
099: long s1 = System.currentTimeMillis(), s2 = 0;
100: if (log.isTraceEnabled())
101: log.trace("join(" + TASK_JOIN_TIME + ')');
102:
103: tmp.interrupt();
104:
105: try {
106: tmp.join(TASK_JOIN_TIME);
107: } catch (Exception e) {
108: }
109: s2 = System.currentTimeMillis();
110: if (log.isTraceEnabled())
111: log.trace("join(" + TASK_JOIN_TIME + ") completed in "
112: + (s2 - s1));
113: if (tmp.isAlive())
114: if (log.isErrorEnabled())
115: log.error("thread is still alive");
116: tmp = null;
117: }
118: }
119:
120: /**
121: * Suspends the thread. Does nothing if already suspended. If a thread is waiting to be assigned a task, or
122: * is currently running a (possibly long-running) task, then it will be suspended the next time it
123: * waits for suspended==false (second wait-loop in run())
124: */
125:
126: public void suspend() {
127: synchronized (this ) {
128: if (log.isTraceEnabled())
129: log.trace("suspended=" + suspended + ", task="
130: + printObj(task));
131: if (!suspended) {
132: suspended = true;
133: }
134: }
135: }
136:
137: /**
138: * Resumes the thread. Noop if not suspended
139: */
140: public void resume() {
141: synchronized (this ) {
142: suspended = false;
143: notifyAll(); // notifies run(): the wait on suspend() is released
144: }
145: }
146:
147: /**
148: * Assigns a task to the thread. If the thread is not running, it will be started. It it is
149: * already working on a task, it will reject the new task. Returns true if task could be
150: * assigned auccessfully
151: */
152: public boolean assignTask(Runnable t) {
153: synchronized (this ) {
154: start(); // creates and starts the thread if not yet running
155: if (task == null) {
156: task = t;
157: notifyAll(); // signals run() to start working (first wait-loop)
158: return true;
159: } else {
160: if (log.isErrorEnabled())
161: log
162: .error("already working on a thread: current_task="
163: + task
164: + ", new task="
165: + t
166: + ", thread="
167: + thread
168: + ", is alive="
169: + (thread != null ? String
170: .valueOf(thread.isAlive())
171: : "null"));
172: return false;
173: }
174: }
175: }
176:
177: /**
178: * Assigns a ThreadLocalListener to the current ReusableThread. The ThreadLocalListener
179: * sets ThreadLocal's values for the lifetime of the task for the thread that is used
180: * to run the task. The ThreadLocalListener will reset values upon the task returning
181: * and at this point will be deleted.
182: */
183: public void assignThreadLocalListener(
184: ThreadLocalListener tl_listener) {
185: this .tl_listener = tl_listener;
186: }
187:
188: /**
189: * Delicate piece of code (means very important :-)). Works as follows: loops until stop is true.
190: * Waits in a loop until task is assigned. Then runs the task and notifies waiters that it's done
191: * when task is completed. Then returns to the first loop to wait for more work. Does so until
192: * stop() is called, which sets stop=true and interrupts the thread. If waiting for a task, the
193: * thread terminates. If running a task, the task is interrupted, and the thread terminates. If the
194: * task is not interrupible, the stop() method will wait for 3 secs (join on the thread), then return.
195: * This means that the run() method of the task will complete and only then will the thread be
196: * garbage-collected.
197: */
198: public void run() {
199: while (thread != null) { // Stop sets thread=null
200: try {
201: if (log.isTraceEnabled())
202: log.trace("entering ASSIGN");
203: synchronized (this ) {
204: if (log.isTraceEnabled())
205: log.trace("entered ASSIGN (task="
206: + printObj(task) + ", thread="
207: + printObj(thread) + ')');
208:
209: while (task == null && thread != null) { // first wait-loop: wait for task to be assigned (assignTask())
210: if (log.isTraceEnabled())
211: log.trace("wait ASSIGN");
212: wait();
213: if (log.isTraceEnabled())
214: log.trace("wait ASSIGN completed");
215: }
216: }
217: } catch (InterruptedException ex) { // on assignTask()
218: if (log.isTraceEnabled())
219: log.trace("interrupt on ASSIGN");
220: }
221: if (thread == null)
222: return; // we need to terminate
223:
224: try {
225: if (log.isTraceEnabled())
226: log.trace("entering SUSPEND");
227: synchronized (this ) {
228: if (log.isTraceEnabled())
229: log.trace("entered SUSPEND (suspended="
230: + suspended + ", task="
231: + printObj(task) + ')');
232: while (suspended && thread != null) { // second wait-loop: wait for thread to resume (resume())
233: if (log.isTraceEnabled())
234: log.trace("wait SUSPEND");
235: wait();
236: if (log.isTraceEnabled())
237: log.trace("wait SUSPEND completed");
238: }
239: }
240: } catch (InterruptedException ex) { // on resume()
241: if (log.isTraceEnabled())
242: log.trace("interrupt on RESUME");
243: }
244: if (thread == null)
245: return; // we need to terminate
246:
247: if (task != null) {
248: if (log.isTraceEnabled())
249: log.trace("setting ThreadLocal(s)");
250: if (tl_listener != null)
251: tl_listener.setThreadLocal();
252: if (log.isTraceEnabled())
253: log.trace("running task");
254: try {
255: task.run(); //here we are actually running the task
256: } catch (Throwable ex) {
257: if (log.isErrorEnabled())
258: log.error("failed running task", ex);
259: } finally {
260: if (log.isTraceEnabled())
261: log.trace("resetting ThreadLocal(s)");
262: if (tl_listener != null)
263: tl_listener.resetThreadLocal();
264: tl_listener = null;
265: }
266: if (log.isTraceEnabled())
267: log.trace("task completed");
268: }
269:
270: if (log.isTraceEnabled())
271: log.trace("entering THIS");
272: synchronized (this ) {
273: if (log.isTraceEnabled())
274: log.trace("entered THIS");
275: task = null;
276: if (log.isTraceEnabled())
277: log.trace("notify THIS");
278: notifyAll();
279: if (log.isTraceEnabled())
280: log.trace("notify THIS completed");
281: }
282: }
283: if (log.isTraceEnabled())
284: log.trace("terminated");
285: }
286:
287: String printObj(Object obj) {
288: if (obj == null)
289: return "null";
290: else
291: return "non-null";
292: }
293:
294: public void waitUntilDone() {
295:
296: if (log.isTraceEnabled())
297: log.trace("entering THIS");
298: synchronized (this ) {
299: if (log.isTraceEnabled())
300: log.trace("entered THIS (task=" + printObj(task) + ')');
301: while (task != null) {
302: try {
303: if (log.isTraceEnabled())
304: log.trace("wait THIS");
305: wait();
306: if (log.isTraceEnabled())
307: log.trace("wait THIS completed");
308: } catch (InterruptedException interrupted) {
309: }
310: }
311: }
312: }
313:
314: public String toString() {
315: return "suspended=" + suspended;
316: }
317:
318: }
|