001: /*
002: File: QueuedExecutor.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 21Jun1998 dl Create public version
012: 28aug1998 dl rely on ThreadFactoryUser, restart now public
013: 4may1999 dl removed redundant interrupt detect
014: 7sep2000 dl new shutdown methods
015: */
016:
017: package EDU.oswego.cs.dl.util.concurrent;
018:
019: /**
020: *
021: * An implementation of Executor that queues incoming
022: * requests until they can be processed by a single background
023: * thread.
024: * <p>
025: * The thread is not actually started until the first
026: * <code>execute</code> request is encountered. Also, if the
027: * thread is stopped for any reason (for example, after hitting
028: * an unrecoverable exception in an executing task), one is started
029: * upon encountering a new request, or if <code>restart()</code> is
030: * invoked.
031: * <p>
032: * Beware that, especially in situations
033: * where command objects themselves invoke execute, queuing can
034: * sometimes lead to lockups, since commands that might allow
035: * other threads to terminate do not run at all when they are in the queue.
036: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
037: **/
038: public class QueuedExecutor extends ThreadFactoryUser implements
039: Executor {
040:
041: /** The thread used to process commands **/
042: protected Thread thread_;
043:
044: /** Special queue element to signal termination **/
045: protected static Runnable ENDTASK = new Runnable() {
046: public void run() {
047: }
048: };
049:
050: /** true if thread should shut down after processing current task **/
051: protected volatile boolean shutdown_; // latches true;
052:
053: /**
054: * Return the thread being used to process commands, or
055: * null if there is no such thread. You can use this
056: * to invoke any special methods on the thread, for
057: * example, to interrupt it.
058: **/
059: public synchronized Thread getThread() {
060: return thread_;
061: }
062:
063: /** set thread_ to null to indicate termination **/
064: protected synchronized void clearThread() {
065: thread_ = null;
066: }
067:
068: /** The queue **/
069: protected final Channel queue_;
070:
071: /**
072: * The runloop is isolated in its own Runnable class
073: * just so that the main
074: * class need not implement Runnable, which would
075: * allow others to directly invoke run, which would
076: * never make sense here.
077: **/
078: protected class RunLoop implements Runnable {
079: public void run() {
080: try {
081: while (!shutdown_) {
082: Runnable task = (Runnable) (queue_.take());
083: if (task == ENDTASK) {
084: shutdown_ = true;
085: break;
086: } else if (task != null) {
087: task.run();
088: task = null;
089: } else
090: break;
091: }
092: } catch (InterruptedException ex) {
093: } // fallthrough
094: finally {
095: clearThread();
096: }
097: }
098: }
099:
100: protected final RunLoop runLoop_;
101:
102: /**
103: * Construct a new QueuedExecutor that uses
104: * the supplied Channel as its queue.
105: * <p>
106: * This class does not support any methods that
107: * reveal this queue. If you need to access it
108: * independently (for example to invoke any
109: * special status monitoring operations), you
110: * should record a reference to it separately.
111: **/
112:
113: public QueuedExecutor(Channel queue) {
114: queue_ = queue;
115: runLoop_ = new RunLoop();
116: }
117:
118: /**
119: * Construct a new QueuedExecutor that uses
120: * a BoundedLinkedQueue with the current
121: * DefaultChannelCapacity as its queue.
122: **/
123:
124: public QueuedExecutor() {
125: this (new BoundedLinkedQueue());
126: }
127:
128: /**
129: * Start (or restart) the background thread to process commands. It has
130: * no effect if a thread is already running. This
131: * method can be invoked if the background thread crashed
132: * due to an unrecoverable exception.
133: **/
134:
135: public synchronized void restart() {
136: if (thread_ == null && !shutdown_) {
137: thread_ = threadFactory_.newThread(runLoop_);
138: thread_.start();
139: }
140: }
141:
142: /**
143: * Arrange for execution of the command in the
144: * background thread by adding it to the queue.
145: * The method may block if the channel's put
146: * operation blocks.
147: * <p>
148: * If the background thread
149: * does not exist, it is created and started.
150: **/
151: public void execute(Runnable command) throws InterruptedException {
152: restart();
153: queue_.put(command);
154: }
155:
156: /**
157: * Terminate background thread after it processes all
158: * elements currently in queue. Any tasks entered after this point will
159: * not be processed. A shut down thread cannot be restarted.
160: * This method may block if the task queue is finite and full.
161: * Also, this method
162: * does not in general apply (and may lead to comparator-based
163: * exceptions) if the task queue is a priority queue.
164: **/
165: public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks() {
166: if (thread_ != null && !shutdown_) {
167: try {
168: queue_.put(ENDTASK);
169: } catch (InterruptedException ex) {
170: Thread.currentThread().interrupt();
171: }
172: }
173: }
174:
175: /**
176: * Terminate background thread after it processes the
177: * current task, removing other queued tasks and leaving them unprocessed.
178: * A shut down thread cannot be restarted.
179: **/
180: public synchronized void shutdownAfterProcessingCurrentTask() {
181: shutdown_ = true;
182: if (thread_ != null) {
183: try {
184: while (queue_.poll(0) != null)
185: ; // drain
186: queue_.put(ENDTASK);
187: } catch (InterruptedException ex) {
188: Thread.currentThread().interrupt();
189: }
190: }
191: }
192:
193: /**
194: * Terminate background thread even if it is currently processing
195: * a task. This method uses Thread.interrupt, so relies on tasks
196: * themselves responding appropriately to interruption. If the
197: * current tasks does not terminate on interruption, then the
198: * thread will not terminate until processing current task.
199: * A shut down thread cannot be restarted.
200: **/
201: public synchronized void shutdownNow() {
202: shutdown_ = true;
203: if (thread_ != null) {
204: thread_.interrupt();
205: shutdownAfterProcessingCurrentTask();
206: }
207: }
208:
209: }
|