001: /*
002:
003: ============================================================================
004: The Apache Software License, Version 1.1
005: ============================================================================
006:
007: Copyright (C) 1999-2003 The Apache Software Foundation. All rights reserved.
008:
009: Redistribution and use in source and binary forms, with or without modifica-
010: tion, are permitted provided that the following conditions are met:
011:
012: 1. Redistributions of source code must retain the above copyright notice,
013: this list of conditions and the following disclaimer.
014:
015: 2. Redistributions in binary form must reproduce the above copyright notice,
016: this list of conditions and the following disclaimer in the documentation
017: and/or other materials provided with the distribution.
018:
019: 3. The end-user documentation included with the redistribution, if any, must
020: include the following acknowledgment: "This product includes software
021: developed by the Apache Software Foundation (http://www.apache.org/)."
022: Alternately, this acknowledgment may appear in the software itself, if
023: and wherever such third-party acknowledgments normally appear.
024:
025: 4. The names "Batik" and "Apache Software Foundation" must not be
026: used to endorse or promote products derived from this software without
027: prior written permission. For written permission, please contact
028: apache@apache.org.
029:
030: 5. Products derived from this software may not be called "Apache", nor may
031: "Apache" appear in their name, without prior written permission of the
032: Apache Software Foundation.
033:
034: THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
035: INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
036: FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
037: APACHE SOFTWARE FOUNDATION OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
038: INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLU-
039: DING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
040: OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
041: ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
042: (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
043: THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
044:
045: This software consists of voluntary contributions made by many individuals
046: on behalf of the Apache Software Foundation. For more information on the
047: Apache Software Foundation, please see <http://www.apache.org/>.
048:
049: */
050:
051: package org.apache.batik.util;
052:
053: import java.util.Iterator;
054:
055: /**
056: * This class represents an object which queues Runnable objects for
057: * invocation in a single thread.
058: *
059: * @author <a href="mailto:stephane@hillion.org">Stephane Hillion</a>
060: * @version $Id$
061: */
062: public class RunnableQueue implements Runnable {
063:
064: /**
065: * Type-safe enumeration of queue states.
066: */
067: public static class RunnableQueueState extends Object {
068: final String value;
069:
070: private RunnableQueueState(String value) {
071: this .value = value.intern();
072: }
073:
074: public String getValue() {
075: return value;
076: }
077:
078: public String toString() {
079: return "[RunnableQueueState: " + value + "]";
080: }
081: }
082:
083: /**
084: * The queue is in the processes of running tasks.
085: */
086: public static final RunnableQueueState RUNNING = new RunnableQueueState(
087: "Running");
088:
089: /**
090: * The queue may still be running tasks but as soon as possible
091: * will go to SUSPENDED state.
092: */
093: public static final RunnableQueueState SUSPENDING = new RunnableQueueState(
094: "Suspending");
095:
096: /**
097: * The queue is no longer running any tasks and will not
098: * run any tasks until resumeExecution is called.
099: */
100: public static final RunnableQueueState SUSPENDED = new RunnableQueueState(
101: "Suspended");
102:
103: /**
104: * The Suspension state of this thread.
105: */
106: protected RunnableQueueState state;
107:
108: /**
109: * Object to synchronize/wait/notify for suspension
110: * issues.
111: */
112: protected Object stateLock = new Object();
113:
114: /**
115: * The Runnable objects list, also used as synchoronization point
116: * for pushing/poping runables.
117: */
118: protected DoublyLinkedList list = new DoublyLinkedList();
119:
120: protected int preemptCount = 0;
121:
122: /**
123: * The object which handle run events.
124: */
125: protected RunHandler runHandler;
126:
127: /**
128: * The current thread.
129: */
130: protected Thread runnableQueueThread;
131:
132: /**
133: * Creates a new RunnableQueue started in a new thread.
134: * @return a RunnableQueue which is garanteed to have entered its
135: * <tt>run()</tt> method.
136: */
137: public static RunnableQueue createRunnableQueue() {
138: RunnableQueue result = new RunnableQueue();
139: synchronized (result) {
140: Thread t = new Thread(result, "RunnableQueue-"
141: + threadCount++);
142: t.setDaemon(true);
143: t.start();
144: while (result.getThread() == null) {
145: try {
146: result.wait();
147: } catch (InterruptedException ie) {
148: }
149: }
150: }
151: return result;
152: }
153:
154: private static int threadCount;
155:
156: /**
157: * Runs this queue.
158: */
159: public void run() {
160: synchronized (this ) {
161: runnableQueueThread = Thread.currentThread();
162: // Wake the create method so it knows we are in
163: // our run and ready to go.
164: notify();
165: }
166:
167: Link l;
168: Runnable rable;
169: try {
170: while (!Thread.currentThread().isInterrupted()) {
171:
172: // Mutex for suspention work.
173: synchronized (stateLock) {
174: if (state != RUNNING) {
175: state = SUSPENDED;
176:
177: // notify suspendExecution in case it is
178: // waiting til we shut down.
179: stateLock.notifyAll();
180:
181: executionSuspended();
182:
183: while (state != RUNNING) {
184: state = SUSPENDED;
185: // Wait until resumeExecution called.
186: stateLock.wait();
187: }
188:
189: executionResumed();
190: }
191: }
192:
193: // The following seriously stress tests the class
194: // for stuff happening between the two sync blocks.
195: //
196: // try {
197: // Thread.sleep(1);
198: // } catch (InterruptedException ie) { }
199:
200: synchronized (list) {
201: if (state == SUSPENDING)
202: continue;
203: l = (Link) list.pop();
204: if (preemptCount != 0)
205: preemptCount--;
206: if (l == null) {
207: // No item to run, wait till there is one.
208: list.wait();
209: continue; // start loop over again...
210: }
211:
212: rable = l.runnable;
213: }
214:
215: try {
216: rable.run();
217: } catch (ThreadDeath td) {
218: // Let it kill us...
219: throw td;
220: } catch (Throwable t) {
221: // Might be nice to notify someone directly.
222: // But this is more or less what Swing does.
223: t.printStackTrace();
224: }
225: l.unlock();
226: runnableInvoked(rable);
227: }
228: } catch (InterruptedException e) {
229: } finally {
230: synchronized (this ) {
231: runnableQueueThread = null;
232: }
233: }
234: }
235:
236: /**
237: * Returns the thread in which the RunnableQueue is currently running.
238: * @return null if the RunnableQueue has not entered his
239: * <tt>run()</tt> method.
240: */
241: public Thread getThread() {
242: return runnableQueueThread;
243: }
244:
245: /**
246: * Schedules the given Runnable object for a later invocation, and
247: * returns.
248: * An exception is thrown if the RunnableQueue was not started.
249: * @throws IllegalStateException if getThread() is null.
250: */
251: public void invokeLater(Runnable r) {
252: if (runnableQueueThread == null) {
253: throw new IllegalStateException(
254: "RunnableQueue not started or has exited");
255: }
256: synchronized (list) {
257: list.push(new Link(r));
258: list.notify();
259: }
260: }
261:
262: /**
263: * Waits until the given Runnable's <tt>run()</tt> has returned.
264: * <em>Note: <tt>invokeAndWait()</tt> must not be called from the
265: * current thread (for example from the <tt>run()</tt> method of the
266: * argument).
267: * @throws IllegalStateException if getThread() is null or if the
268: * thread returned by getThread() is the current one.
269: */
270: public void invokeAndWait(Runnable r) throws InterruptedException {
271: if (runnableQueueThread == null) {
272: throw new IllegalStateException(
273: "RunnableQueue not started or has exited");
274: }
275: if (runnableQueueThread == Thread.currentThread()) {
276: throw new IllegalStateException(
277: "Cannot be called from the RunnableQueue thread");
278: }
279:
280: LockableLink l = new LockableLink(r);
281: synchronized (list) {
282: list.push(l);
283: list.notify();
284: }
285: l.lock();
286: }
287:
288: /**
289: * Schedules the given Runnable object for a later invocation, and
290: * returns. The given runnable preempts any runnable that is not
291: * currently executing (ie the next runnable started will be the
292: * one given). An exception is thrown if the RunnableQueue was
293: * not started.
294: * @throws IllegalStateException if getThread() is null.
295: */
296: public void preemptLater(Runnable r) {
297: if (runnableQueueThread == null) {
298: throw new IllegalStateException(
299: "RunnableQueue not started or has exited");
300: }
301: synchronized (list) {
302: list.add(preemptCount, new Link(r));
303: preemptCount++;
304: list.notify();
305: }
306: }
307:
308: /**
309: * Waits until the given Runnable's <tt>run()</tt> has returned.
310: * The given runnable preempts any runnable that is not currently
311: * executing (ie the next runnable started will be the one given).
312: * <em>Note: <tt>preemptAndWait()</tt> must not be called from the
313: * current thread (for example from the <tt>run()</tt> method of the
314: * argument).
315: * @throws IllegalStateException if getThread() is null or if the
316: * thread returned by getThread() is the current one.
317: */
318: public void preemptAndWait(Runnable r) throws InterruptedException {
319: if (runnableQueueThread == null) {
320: throw new IllegalStateException(
321: "RunnableQueue not started or has exited");
322: }
323: if (runnableQueueThread == Thread.currentThread()) {
324: throw new IllegalStateException(
325: "Cannot be called from the RunnableQueue thread");
326: }
327:
328: LockableLink l = new LockableLink(r);
329: synchronized (list) {
330: list.add(preemptCount, l);
331: preemptCount++;
332: list.notify();
333: }
334: l.lock();
335: }
336:
337: public RunnableQueueState getQueueState() {
338: synchronized (stateLock) {
339: return state;
340: }
341: }
342:
343: /**
344: * Suspends the execution of this queue after the current runnable
345: * completes.
346: * @param waitTillSuspended if true this method will not return
347: * until the queue has suspended (no runnable in progress
348: * or about to be in progress). If resumeExecution is
349: * called while waiting will simply return (this really
350: * indicates a race condition in your code). This may
351: * return before an associated RunHandler is notified.
352: * @throws IllegalStateException if getThread() is null. */
353: public void suspendExecution(boolean waitTillSuspended) {
354: if (runnableQueueThread == null) {
355: throw new IllegalStateException(
356: "RunnableQueue not started or has exited");
357: }
358: synchronized (stateLock) {
359: if (state == SUSPENDED)
360: // already suspended...
361: return;
362:
363: if (state == RUNNING) {
364: state = SUSPENDING;
365: synchronized (list) {
366: // Wake up run thread if it is waiting for jobs,
367: // so we go into the suspended case (notifying
368: // run-handler etc...)
369: list.notify();
370: }
371: }
372:
373: if (waitTillSuspended)
374: try {
375: stateLock.wait();
376: } catch (InterruptedException ie) {
377: }
378: }
379: }
380:
381: /**
382: * Resumes the execution of this queue.
383: * @throws IllegalStateException if getThread() is null.
384: */
385: public void resumeExecution() {
386: if (runnableQueueThread == null) {
387: throw new IllegalStateException(
388: "RunnableQueue not started or has exited");
389: }
390:
391: synchronized (stateLock) {
392: if (state != RUNNING) {
393: state = RUNNING;
394: stateLock.notifyAll(); // wake it up.
395: }
396: }
397: }
398:
399: /**
400: * Returns iterator lock to use to work with the iterator
401: * returned by iterator().
402: */
403: public Object getIteratorLock() {
404: return list;
405: }
406:
407: /**
408: * Returns an iterator over the runnables.
409: */
410: public Iterator iterator() {
411: return new Iterator() {
412: Link head = (Link) list.getHead();
413: Link link;
414:
415: public boolean hasNext() {
416: if (head == null) {
417: return false;
418: }
419: if (link == null) {
420: return true;
421: }
422: return link != head;
423: }
424:
425: public Object next() {
426: if (head == null || head == link) {
427: throw new java.util.NoSuchElementException();
428: }
429: if (link == null) {
430: link = (Link) head.getNext();
431: return head.runnable;
432: }
433: Object result = link.runnable;
434: link = (Link) link.getNext();
435: return result;
436: }
437:
438: public void remove() {
439: throw new UnsupportedOperationException();
440: }
441: };
442: }
443:
444: /**
445: * Sets the RunHandler for this queue.
446: */
447: public synchronized void setRunHandler(RunHandler rh) {
448: runHandler = rh;
449: }
450:
451: /**
452: * Returns the RunHandler or null.
453: */
454: public synchronized RunHandler getRunHandler() {
455: return runHandler;
456: }
457:
458: /**
459: * Called when execution is being suspended.
460: * Currently just notifies runHandler
461: */
462: protected synchronized void executionSuspended() {
463: if (runHandler != null) {
464: runHandler.executionSuspended(this );
465: }
466: }
467:
468: /**
469: * Called when execution is being resumed.
470: * Currently just notifies runHandler
471: */
472: protected synchronized void executionResumed() {
473: if (runHandler != null) {
474: runHandler.executionResumed(this );
475: }
476: }
477:
478: /**
479: * Called when a Runnable completes.
480: * Currently just notifies runHandler
481: * @param rable The runnable that just completed.
482: */
483: protected synchronized void runnableInvoked(Runnable rable) {
484: if (runHandler != null) {
485: runHandler.runnableInvoked(this , rable);
486: }
487: }
488:
489: /**
490: * This interface must be implemented by an object which wants to
491: * be notified of run events.
492: */
493: public interface RunHandler {
494:
495: /**
496: * Called when the given Runnable has just been invoked and
497: * has returned.
498: */
499: void runnableInvoked(RunnableQueue rq, Runnable r);
500:
501: /**
502: * Called when the execution of the queue has been suspended.
503: */
504: void executionSuspended(RunnableQueue rq);
505:
506: /**
507: * Called when the execution of the queue has been resumed.
508: */
509: void executionResumed(RunnableQueue rq);
510: }
511:
512: /**
513: * To store a Runnable.
514: */
515: protected static class Link extends DoublyLinkedList.Node {
516:
517: /**
518: * The Runnable.
519: */
520: public Runnable runnable;
521:
522: /**
523: * Creates a new link.
524: */
525: public Link(Runnable r) {
526: runnable = r;
527: }
528:
529: /**
530: * unlock link and notify locker.
531: * Basic implementation does nothing.
532: */
533: public void unlock() throws InterruptedException {
534: return;
535: }
536: }
537:
538: /**
539: * To store a Runnable with an object waiting for him to be executed.
540: */
541: protected static class LockableLink extends Link {
542:
543: /**
544: * Whether this link is actually locked.
545: */
546: protected boolean locked;
547:
548: /**
549: * Creates a new link.
550: */
551: public LockableLink(Runnable r) {
552: super (r);
553: }
554:
555: /**
556: * Whether the link is actually locked.
557: */
558: public boolean isLocked() {
559: return locked;
560: }
561:
562: /**
563: * Locks this link.
564: */
565: public synchronized void lock() throws InterruptedException {
566: locked = true;
567: notify();
568: wait();
569: }
570:
571: /**
572: * unlocks this link.
573: */
574: public synchronized void unlock() throws InterruptedException {
575: while (!locked) {
576: // Wait until lock is called...
577: wait();
578: }
579: // Wake the locking thread...
580: notify();
581: }
582: }
583: }
|