001: /*
002:
003: Licensed to the Apache Software Foundation (ASF) under one or more
004: contributor license agreements. See the NOTICE file distributed with
005: this work for additional information regarding copyright ownership.
006: The ASF licenses this file to You under the Apache License, Version 2.0
007: (the "License"); you may not use this file except in compliance with
008: the License. You may obtain a copy of the License at
009:
010: http://www.apache.org/licenses/LICENSE-2.0
011:
012: Unless required by applicable law or agreed to in writing, software
013: distributed under the License is distributed on an "AS IS" BASIS,
014: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: See the License for the specific language governing permissions and
016: limitations under the License.
017:
018: */
019: package org.apache.batik.util;
020:
021: import java.util.Iterator;
022: import java.util.NoSuchElementException;
023:
024: /**
025: * This class represents an object which queues Runnable objects for
026: * invocation in a single thread.
027: *
028: * @author <a href="mailto:stephane@hillion.org">Stephane Hillion</a>
029: * @version $Id: RunnableQueue.java 501495 2007-01-30 18:00:36Z dvholten $
030: */
031: public class RunnableQueue implements Runnable {
032:
033: /**
034: * Type-safe enumeration of queue states.
035: */
036: public static final class RunnableQueueState {
037:
038: private final String value;
039:
040: private RunnableQueueState(String value) {
041: this .value = value;
042: }
043:
044: public String getValue() {
045: return value;
046: }
047:
048: public String toString() {
049: return "[RunnableQueueState: " + value + ']';
050: }
051: }
052:
053: /**
054: * The queue is in the process of running tasks.
055: */
056: public static final RunnableQueueState RUNNING = new RunnableQueueState(
057: "Running");
058:
059: /**
060: * The queue may still be running tasks but as soon as possible
061: * will go to SUSPENDED state.
062: */
063: public static final RunnableQueueState SUSPENDING = new RunnableQueueState(
064: "Suspending");
065:
066: /**
067: * The queue is no longer running any tasks and will not
068: * run any tasks until resumeExecution is called.
069: */
070: public static final RunnableQueueState SUSPENDED = new RunnableQueueState(
071: "Suspended");
072:
073: /**
074: * The Suspension state of this thread.
075: */
076: protected volatile RunnableQueueState state;
077:
078: /**
079: * Object to synchronize/wait/notify for suspension
080: * issues.
081: */
082: protected final Object stateLock = new Object();
083:
084: /**
085: * Used to indicate if the queue was resumed while
086: * still running, so a 'resumed' event can be sent.
087: */
088: protected boolean wasResumed;
089:
090: /**
091: * The Runnable objects list, also used as synchronization point
092: * for pushing/poping runables.
093: */
094: private final DoublyLinkedList list = new DoublyLinkedList();
095:
096: /**
097: * Count of preempt entries in queue, so preempt entries
098: * can be kept properly ordered.
099: */
100: protected int preemptCount;
101:
102: /**
103: * The object which handle run events.
104: */
105: protected RunHandler runHandler;
106:
107: /**
108: * The current thread.
109: */
110: protected volatile HaltingThread runnableQueueThread;
111:
112: /**
113: * The {@link IdleRunnable} to run if the queue is empty.
114: */
115: private IdleRunnable idleRunnable;
116:
117: /**
118: * The time (in milliseconds) that the idle runnable should be run next.
119: */
120: private long idleRunnableWaitTime;
121:
122: /**
123: * Creates a new RunnableQueue started in a new thread.
124: * @return a RunnableQueue which is guaranteed to have entered its
125: * <tt>run()</tt> method.
126: */
127: public static RunnableQueue createRunnableQueue() {
128: RunnableQueue result = new RunnableQueue();
129: synchronized (result) { // todo ?? sync on local object has no meaning ??
130: HaltingThread ht = new HaltingThread(result,
131: "RunnableQueue-" + threadCount++);
132: ht.setDaemon(true);
133: ht.start();
134: while (result.getThread() == null) {
135: try {
136: result.wait();
137: } catch (InterruptedException ie) {
138: }
139: }
140: }
141: return result;
142: }
143:
144: private static volatile int threadCount;
145:
146: /**
147: * Runs this queue.
148: */
149: public void run() {
150: synchronized (this ) {
151: runnableQueueThread = (HaltingThread) Thread
152: .currentThread();
153: // Wake the create method so it knows we are in
154: // our run and ready to go.
155: notify();
156: }
157:
158: Link l;
159: Runnable rable;
160: try {
161: while (!HaltingThread.hasBeenHalted()) {
162: boolean callSuspended = false;
163: boolean callResumed = false;
164: // Mutex for suspension work.
165: synchronized (stateLock) {
166: if (state != RUNNING) {
167: state = SUSPENDED;
168: callSuspended = true;
169: }
170: }
171: if (callSuspended)
172: executionSuspended();
173:
174: synchronized (stateLock) {
175: while (state != RUNNING) {
176: state = SUSPENDED;
177:
178: // notify suspendExecution in case it is
179: // waiting til we shut down.
180: stateLock.notifyAll();
181:
182: // Wait until resumeExecution called.
183: try {
184: stateLock.wait();
185: } catch (InterruptedException ie) {
186: }
187: }
188:
189: if (wasResumed) {
190: wasResumed = false;
191: callResumed = true;
192: }
193: }
194:
195: if (callResumed)
196: executionResumed();
197:
198: // The following seriously stress tests the class
199: // for stuff happening between the two sync blocks.
200: //
201: // try {
202: // Thread.sleep(1);
203: // } catch (InterruptedException ie) { }
204:
205: synchronized (list) {
206: if (state == SUSPENDING)
207: continue;
208: l = (Link) list.pop();
209: if (preemptCount != 0)
210: preemptCount--;
211: if (l == null) {
212: // No item to run, see if there is an idle runnable
213: // to run instead.
214: if (idleRunnable != null
215: && (idleRunnableWaitTime = idleRunnable
216: .getWaitTime()) < System
217: .currentTimeMillis()) {
218: rable = idleRunnable;
219: } else {
220: // Wait for a runnable.
221: try {
222: if (idleRunnable != null
223: && idleRunnableWaitTime != Long.MAX_VALUE) {
224: long t = idleRunnableWaitTime
225: - System
226: .currentTimeMillis();
227: if (t > 0) {
228: list.wait(t);
229: }
230: } else {
231: list.wait();
232: }
233: } catch (InterruptedException ie) {
234: // just loop again.
235: }
236: continue; // start loop over again...
237: }
238: } else {
239: rable = l.runnable;
240: }
241: }
242:
243: runnableStart(rable);
244:
245: try {
246: rable.run();
247: } catch (ThreadDeath td) {
248: // Let it kill us...
249: throw td;
250: } catch (Throwable t) {
251: // Might be nice to notify someone directly.
252: // But this is more or less what Swing does.
253: t.printStackTrace();
254: }
255: // Notify something waiting on the runnable just completed,
256: // if we just ran one from the queue.
257: if (l != null) {
258: l.unlock();
259: }
260: runnableInvoked(rable);
261: }
262: } finally {
263: synchronized (this ) {
264: runnableQueueThread = null;
265: }
266: }
267: }
268:
269: /**
270: * Returns the thread in which the RunnableQueue is currently running.
271: * @return null if the RunnableQueue has not entered his
272: * <tt>run()</tt> method.
273: */
274: public HaltingThread getThread() {
275: return runnableQueueThread;
276: }
277:
278: /**
279: * Schedules the given Runnable object for a later invocation, and
280: * returns.
281: * An exception is thrown if the RunnableQueue was not started.
282: * @throws IllegalStateException if getThread() is null.
283: */
284: public void invokeLater(Runnable r) {
285: if (runnableQueueThread == null) {
286: throw new IllegalStateException(
287: "RunnableQueue not started or has exited");
288: }
289: synchronized (list) {
290: list.push(new Link(r));
291: list.notify();
292: }
293: }
294:
295: /**
296: * Waits until the given Runnable's <tt>run()</tt> has returned.
297: * <em>Note: <tt>invokeAndWait()</tt> must not be called from the
298: * current thread (for example from the <tt>run()</tt> method of the
299: * argument).</em>
300: * @throws IllegalStateException if getThread() is null or if the
301: * thread returned by getThread() is the current one.
302: */
303: public void invokeAndWait(Runnable r) throws InterruptedException {
304: if (runnableQueueThread == null) {
305: throw new IllegalStateException(
306: "RunnableQueue not started or has exited");
307: }
308: if (runnableQueueThread == Thread.currentThread()) {
309: throw new IllegalStateException(
310: "Cannot be called from the RunnableQueue thread");
311: }
312:
313: LockableLink l = new LockableLink(r);
314: synchronized (list) {
315: list.push(l);
316: list.notify();
317: }
318: l.lock();
319: }
320:
321: /**
322: * Schedules the given Runnable object for a later invocation, and
323: * returns. The given runnable preempts any runnable that is not
324: * currently executing (ie the next runnable started will be the
325: * one given). An exception is thrown if the RunnableQueue was
326: * not started.
327: * @throws IllegalStateException if getThread() is null.
328: */
329: public void preemptLater(Runnable r) {
330: if (runnableQueueThread == null) {
331: throw new IllegalStateException(
332: "RunnableQueue not started or has exited");
333: }
334: synchronized (list) {
335: list.add(preemptCount, new Link(r));
336: preemptCount++;
337: list.notify();
338: }
339: }
340:
341: /**
342: * Waits until the given Runnable's <tt>run()</tt> has returned.
343: * The given runnable preempts any runnable that is not currently
344: * executing (ie the next runnable started will be the one given).
345: * <em>Note: <tt>preemptAndWait()</tt> must not be called from the
346: * current thread (for example from the <tt>run()</tt> method of the
347: * argument).
348: * @throws IllegalStateException if getThread() is null or if the
349: * thread returned by getThread() is the current one.
350: */
351: public void preemptAndWait(Runnable r) throws InterruptedException {
352: if (runnableQueueThread == null) {
353: throw new IllegalStateException(
354: "RunnableQueue not started or has exited");
355: }
356: if (runnableQueueThread == Thread.currentThread()) {
357: throw new IllegalStateException(
358: "Cannot be called from the RunnableQueue thread");
359: }
360:
361: LockableLink l = new LockableLink(r);
362: synchronized (list) {
363: list.add(preemptCount, l);
364: preemptCount++;
365: list.notify();
366: }
367: l.lock();
368: }
369:
370: public RunnableQueueState getQueueState() {
371: synchronized (stateLock) {
372: return state;
373: }
374: }
375:
376: /**
377: * Suspends the execution of this queue after the current runnable
378: * completes.
379: * @param waitTillSuspended if true this method will not return
380: * until the queue has suspended (no runnable in progress
381: * or about to be in progress). If resumeExecution is
382: * called while waiting will simply return (this really
383: * indicates a race condition in your code). This may
384: * return before an associated RunHandler is notified.
385: * @throws IllegalStateException if getThread() is null.
386: */
387: public void suspendExecution(boolean waitTillSuspended) {
388: if (runnableQueueThread == null) {
389: throw new IllegalStateException(
390: "RunnableQueue not started or has exited");
391: }
392: // System.err.println("Suspend Called");
393: synchronized (stateLock) {
394: wasResumed = false;
395:
396: if (state == SUSPENDED) {
397: // already suspended, notify stateLock so an event is
398: // generated.
399: stateLock.notifyAll();
400: return;
401: }
402:
403: if (state == RUNNING) {
404: state = SUSPENDING;
405: synchronized (list) {
406: // Wake up run thread if it is waiting for jobs,
407: // so we go into the suspended case (notifying
408: // run-handler etc...)
409: list.notify();
410: }
411: }
412:
413: if (waitTillSuspended) {
414: while (state == SUSPENDING) {
415: try {
416: stateLock.wait();
417: } catch (InterruptedException ie) {
418: }
419: }
420: }
421: }
422: }
423:
424: /**
425: * Resumes the execution of this queue.
426: * @throws IllegalStateException if getThread() is null.
427: */
428: public void resumeExecution() {
429: // System.err.println("Resume Called");
430: if (runnableQueueThread == null) {
431: throw new IllegalStateException(
432: "RunnableQueue not started or has exited");
433: }
434:
435: synchronized (stateLock) {
436: wasResumed = true;
437:
438: if (state != RUNNING) {
439: state = RUNNING;
440: stateLock.notifyAll(); // wake it up.
441: }
442: }
443: }
444:
445: /**
446: * Returns iterator lock to use to work with the iterator
447: * returned by iterator().
448: */
449: public Object getIteratorLock() {
450: return list;
451: }
452:
453: /**
454: * Returns an iterator over the runnables.
455: */
456: public Iterator iterator() {
457: return new Iterator() {
458: Link head = (Link) list.getHead();
459: Link link;
460:
461: public boolean hasNext() {
462: if (head == null) {
463: return false;
464: }
465: if (link == null) {
466: return true;
467: }
468: return link != head;
469: }
470:
471: public Object next() {
472: if (head == null || head == link) {
473: throw new NoSuchElementException();
474: }
475: if (link == null) {
476: link = (Link) head.getNext();
477: return head.runnable;
478: }
479: Object result = link.runnable;
480: link = (Link) link.getNext();
481: return result;
482: }
483:
484: public void remove() {
485: throw new UnsupportedOperationException();
486: }
487: };
488: }
489:
490: /**
491: * Sets the RunHandler for this queue.
492: */
493: public synchronized void setRunHandler(RunHandler rh) {
494: runHandler = rh;
495: }
496:
497: /**
498: * Returns the RunHandler or null.
499: */
500: public synchronized RunHandler getRunHandler() {
501: return runHandler;
502: }
503:
504: /**
505: * Sets a Runnable to be run whenever the queue is empty.
506: */
507: public void setIdleRunnable(IdleRunnable r) {
508: synchronized (list) {
509: idleRunnable = r;
510: idleRunnableWaitTime = 0;
511: list.notify();
512: }
513: }
514:
515: /**
516: * Called when execution is being suspended.
517: * Currently just notifies runHandler
518: */
519: protected synchronized void executionSuspended() {
520: // System.err.println("Suspend Sent");
521: if (runHandler != null) {
522: runHandler.executionSuspended(this );
523: }
524: }
525:
526: /**
527: * Called when execution is being resumed.
528: * Currently just notifies runHandler
529: */
530: protected synchronized void executionResumed() {
531: // System.err.println("Resumed Sent");
532: if (runHandler != null) {
533: runHandler.executionResumed(this );
534: }
535: }
536:
537: /**
538: * Called just prior to executing a Runnable.
539: * Currently just notifies runHandler
540: * @param rable The runnable that is about to start
541: */
542: protected synchronized void runnableStart(Runnable rable) {
543: if (runHandler != null) {
544: runHandler.runnableStart(this , rable);
545: }
546: }
547:
548: /**
549: * Called when a Runnable completes.
550: * Currently just notifies runHandler
551: * @param rable The runnable that just completed.
552: */
553: protected synchronized void runnableInvoked(Runnable rable) {
554: if (runHandler != null) {
555: runHandler.runnableInvoked(this , rable);
556: }
557: }
558:
559: /**
560: * A {@link Runnable} that can also inform the caller how long it should
561: * be until it is run again.
562: */
563: public interface IdleRunnable extends Runnable {
564:
565: /**
566: * Returns the system time that can be safely waited until before this
567: * {@link Runnable} is run again.
568: *
569: * @return time to wait until, <code>0</code> if no waiting can
570: * be done, or {@link Long#MAX_VALUE} if the {@link Runnable}
571: * should not be run again at this time
572: */
573: long getWaitTime();
574: }
575:
576: /**
577: * This interface must be implemented by an object which wants to
578: * be notified of run events.
579: */
580: public interface RunHandler {
581:
582: /**
583: * Called just prior to invoking the runnable
584: */
585: void runnableStart(RunnableQueue rq, Runnable r);
586:
587: /**
588: * Called when the given Runnable has just been invoked and
589: * has returned.
590: */
591: void runnableInvoked(RunnableQueue rq, Runnable r);
592:
593: /**
594: * Called when the execution of the queue has been suspended.
595: */
596: void executionSuspended(RunnableQueue rq);
597:
598: /**
599: * Called when the execution of the queue has been resumed.
600: */
601: void executionResumed(RunnableQueue rq);
602: }
603:
604: /**
605: * This is an adapter class that implements the RunHandler interface.
606: * It simply does nothing in response to the calls.
607: */
608: public static class RunHandlerAdapter implements RunHandler {
609:
610: /**
611: * Called just prior to invoking the runnable
612: */
613: public void runnableStart(RunnableQueue rq, Runnable r) {
614: }
615:
616: /**
617: * Called when the given Runnable has just been invoked and
618: * has returned.
619: */
620: public void runnableInvoked(RunnableQueue rq, Runnable r) {
621: }
622:
623: /**
624: * Called when the execution of the queue has been suspended.
625: */
626: public void executionSuspended(RunnableQueue rq) {
627: }
628:
629: /**
630: * Called when the execution of the queue has been resumed.
631: */
632: public void executionResumed(RunnableQueue rq) {
633: }
634: }
635:
636: /**
637: * To store a Runnable.
638: */
639: protected static class Link extends DoublyLinkedList.Node {
640:
641: /**
642: * The Runnable.
643: */
644: private final Runnable runnable;
645:
646: /**
647: * Creates a new link.
648: */
649: public Link(Runnable r) {
650: runnable = r;
651: }
652:
653: /**
654: * unlock link and notify locker.
655: * Basic implementation does nothing.
656: */
657: public void unlock() {
658: return;
659: }
660: }
661:
662: /**
663: * To store a Runnable with an object waiting for him to be executed.
664: */
665: protected static class LockableLink extends Link {
666:
667: /**
668: * Whether this link is actually locked.
669: */
670: private volatile boolean locked;
671:
672: /**
673: * Creates a new link.
674: */
675: public LockableLink(Runnable r) {
676: super (r);
677: }
678:
679: /**
680: * Whether the link is actually locked.
681: */
682: public boolean isLocked() {
683: return locked;
684: }
685:
686: /**
687: * Locks this link.
688: */
689: public synchronized void lock() throws InterruptedException {
690: locked = true;
691: notify();
692: wait();
693: }
694:
695: /**
696: * unlocks this link.
697: */
698: public synchronized void unlock() {
699: while (!locked) {
700: try {
701: wait(); // Wait until lock is called...
702: } catch (InterruptedException ie) {
703: // Loop again...
704: }
705: }
706: locked = false;
707: // Wake the locking thread...
708: notify();
709: }
710: }
711: }
|