001: /*
002: * @(#)QueueThread.java 0.9.0 06/04/2000 - 13:31:52
003: *
004: * Copyright (C) 2000,,2003 2002 Matt Albrecht
005: * groboclown@users.sourceforge.net
006: * http://groboutils.sourceforge.net
007: *
008: * Permission is hereby granted, free of charge, to any person obtaining a
009: * copy of this software and associated documentation files (the "Software"),
010: * to deal in the Software without restriction, including without limitation
011: * the rights to use, copy, modify, merge, publish, distribute, sublicense,
012: * and/or sell copies of the Software, and to permit persons to whom the
013: * Software is furnished to do so, subject to the following conditions:
014: *
015: * The above copyright notice and this permission notice shall be included in
016: * all copies or substantial portions of the Software.
017: *
018: * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
019: * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
020: * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
021: * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
022: * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
023: * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
024: * DEALINGS IN THE SOFTWARE.
025: */
026:
027: package net.sourceforge.groboutils.util.thread.v1;
028:
029: import net.sourceforge.groboutils.util.datastruct.v1.SynchQueue;
030:
031: /**
032: * For threads which endlessly process events from a SynchQueue. This
033: * is a common technique for thread pooling.
034: * <P>
035: * Users must make a implementation of <tt>IObjectListener</tt>.
036: * If the user does not give a <tt>SynchQueue</tt>, it is created for them.
037: * Once the QueueThread is created, the queue and the listener cannot
038: * be changed.
039: * <P>
040: * By default, the underlying {@link LoopThread} does not sleep between events,
041: * it is a daemon thread, runs on the lowest thread priority, and has
042: * not started yet. Of course, this can all be changed.
043: * <P>
044: * It is advisable not to use the methods {@link
045: * LoopThread#setSleepTime( int )} or
046: * {@link LoopThread#setSleepTimeMillis( long )}, since these will cause the
047: * <tt>QueueThread</tt> to not respond to incoming requests during this sleep
048: * time. However, there may be occations when this is necessary, so this is
049: * left available to the user.
050: * <P>
051: * Since the {@link SynchQueue#dequeue()} method can wait indefinitely, you may
052: * opt to set the dequeue's timeout to something other than 0. Without setting
053: * this, the thread may wait indefinitely for an incoming element to the queue
054: * without ever checking for a {@link LoopThread#stop()} or
055: * {@link LoopThread#suspend()} signal. Thanks to
056: * <a href="mailto:d.gallot@atoseuronext.be">Dominique Gallot</a> for pointing
057: * this out.
058: * <P>
059: * After a {@link LoopThread#stop()} is invoked, you may allow the
060: * object listener to finish processing the remaining elements in the
061: * inner queue by calling {@link #processRemaining()}.
062: *
063: * @author Matt Albrecht <a href="mailto:groboclown@users.sourceforge.net">groboclown@users.sourceforge.net</a>
064: * @since June 4, 2000
065: * @version $Date: 2003/02/10 22:52:48 $
066: */
067: public class QueueThread extends LoopThread {
068: private IObjectListener m_objListener = null;
069:
070: private SynchQueue m_queue = null;
071:
072: private boolean m_isProcessingObject = false;
073:
074: private long m_timeout = 0;
075: private int m_nanos = 0;
076:
077: /**
078: * The runnable class - keep it private so no one
079: * can directly access it in a thread but us.
080: */
081: private class QueueRunnable implements Runnable {
082: public void run() {
083: if (m_queue.isEmpty()) {
084: m_isProcessingObject = false;
085: }
086:
087: try {
088: Object o = m_queue.dequeue(m_timeout, m_nanos);
089: m_isProcessingObject = true;
090: m_objListener.processObject(o);
091:
092: if (m_queue.isEmpty()) {
093: m_isProcessingObject = false;
094: }
095: } catch (InterruptedException ie) {
096: // whoops - how do we handle this?
097: }
098: }
099: }
100:
101: //--------------------------------------------------------------
102: // Constructors
103:
104: /**
105: *
106: */
107: public QueueThread(IObjectListener ol) {
108: this (ol, new SynchQueue());
109: }
110:
111: /**
112: *
113: */
114: public QueueThread(IObjectListener ol, SynchQueue sq) {
115: super ();
116:
117: initialize(ol, sq);
118: }
119:
120: /**
121: *
122: */
123: public QueueThread(IObjectListener ol, ThreadGroup tg) {
124: this (ol, new SynchQueue(), tg);
125: }
126:
127: /**
128: *
129: */
130: public QueueThread(IObjectListener ol, SynchQueue sq, ThreadGroup tg) {
131: super (null, tg);
132:
133: initialize(ol, sq);
134: }
135:
136: /**
137: *
138: */
139: public QueueThread(IObjectListener ol, String threadName) {
140: this (ol, new SynchQueue(), threadName);
141: }
142:
143: /**
144: *
145: */
146: public QueueThread(IObjectListener ol, SynchQueue sq,
147: String threadName) {
148: super (null, threadName);
149:
150: initialize(ol, sq);
151: }
152:
153: /**
154: *
155: */
156: public QueueThread(IObjectListener ol, ThreadGroup tg,
157: String threadName) {
158: this (ol, new SynchQueue(), tg, threadName);
159: }
160:
161: /**
162: *
163: */
164: public QueueThread(IObjectListener ol, SynchQueue sq,
165: ThreadGroup tg, String threadName) {
166: super (null, tg, threadName);
167:
168: initialize(ol, sq);
169: }
170:
171: //--------------------------------------------------------------
172: // Public Methods
173:
174: /**
175: * Retrieves the internal listened queue.
176: */
177: public SynchQueue getQueue() {
178: return this .m_queue;
179: }
180:
181: /**
182: * @return <tt>false</tt> if the thread is waiting for an object
183: * to be placed in the queue and be processed, otherwise
184: * <tt>true</tt>.
185: */
186: public boolean isProcessingObjects() {
187: return this .m_isProcessingObject;
188: }
189:
190: /**
191: * Set the maximum time (in milliseconds and nanoseconds) to wait for
192: * an incoming element on the inner queue before checking for
193: * {@link LoopThread#stop()} and {@link LoopThread#suspend()}
194: * signals.
195: *
196: * @param timeout the maximum time to wait in milliseconds.
197: * @param nanos additional time, in nanoseconds range 0-999999.
198: * @see SynchQueue#dequeue( long, int )
199: */
200: public void setTimeout(long timeout, int nanos) {
201: this .m_timeout = timeout;
202: this .m_nanos = nanos;
203: }
204:
205: /**
206: * Set the maximum time (in milliseconds) to wait for
207: * an incoming element on the inner queue before checking for
208: * {@link LoopThread#stop()} and {@link LoopThread#suspend()}
209: * signals.
210: *
211: * @param timeout the maximum time to wait in milliseconds.
212: * @param nanos additional time, in nanoseconds range 0-999999.
213: * @see SynchQueue#dequeue( long, int )
214: * @see #setTimeout( long, int )
215: */
216: public void setTimeout(long timeout) {
217: setTimeout(timeout, 0);
218: }
219:
220: /**
221: * Retrieve the millisecond part of the maximum timeout to wait for an
222: * incoming element on the inner queue before checking for thread
223: * event signals.
224: *
225: * @see #setTimeout( long, int )
226: */
227: public long getTimeoutMilliseconds() {
228: return this .m_timeout;
229: }
230:
231: /**
232: * Retrieve the nanosecond part of the maximum timeout to wait for an
233: * incoming element on the inner queue before checking for thread
234: * event signals.
235: *
236: * @see #setTimeout( long, int )
237: */
238: public int getTimeoutNanoseconds() {
239: return this .m_nanos;
240: }
241:
242: /**
243: * Process all elements in the queue until the queue is empty.
244: * This may only be called while the thread is not running.
245: * <P>
246: * This should be invoked with care, as it can cause an infinite
247: * loop if another thread is pushing in data after this processing
248: * thread has finished (but, that could also lead to an out-of-memory
249: * error if this method is never invoked).
250: */
251: public void processRemaining() throws InterruptedException {
252: if (isRunning()) {
253: throw new IllegalStateException(
254: "cannot call processRemaining() while the underlying thread "
255: + "is still running.");
256: }
257: if (!this .m_queue.isEmpty()) {
258: this .m_isProcessingObject = true;
259: do {
260: Object o = this .m_queue.dequeue();
261: this .m_objListener.processObject(o);
262: } while (!this .m_queue.isEmpty());
263: this .m_isProcessingObject = false;
264: }
265: }
266:
267: //--------------------------------------------------------------
268: // Protected Methods
269:
270: protected void initialize(IObjectListener ol, SynchQueue sq) {
271: setRunnable(new QueueRunnable());
272:
273: this .m_objListener = ol;
274: this .m_queue = sq;
275:
276: initializeDefaults();
277: }
278:
279: protected void initializeDefaults() {
280: setSleepTime(0);
281: setDaemon(true);
282: setPriority(Thread.MIN_PRIORITY);
283: }
284:
285: }
|