001: /*
002: *
003: *
004: * Portions Copyright 2000-2007 Sun Microsystems, Inc. All Rights
005: * Reserved. Use is subject to license terms.
006: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
007: *
008: * This program is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU General Public License version
010: * 2 only, as published by the Free Software Foundation.
011: *
012: * This program is distributed in the hope that it will be useful, but
013: * WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * General Public License version 2 for more details (a copy is
016: * included at /legal/license.txt).
017: *
018: * You should have received a copy of the GNU General Public License
019: * version 2 along with this work; if not, write to the Free Software
020: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
021: * 02110-1301 USA
022: *
023: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
024: * Clara, CA 95054 or visit www.sun.com if you need additional
025: * information or have any questions.
026: */
027:
028: /*****************************************************************************
029: * Copyright (C) The Apache Software Foundation. All rights reserved. *
030: * ------------------------------------------------------------------------- *
031: * This software is published under the terms of the Apache Software License *
032: * version 1.1, a copy of which has been included with this distribution in *
033: * the LICENSE file. *
034: *****************************************************************************/package com.sun.perseus.util;
035:
036: import com.sun.perseus.platform.ThreadSupport;
037:
038: /**
039: * This class represents an object which queues Runnable objects for
040: * invocation in a single thread.
041: *
042: * This class is derived from work done in the Batik project but was
043: * seriously modified and extended.
044: *
045: * @version $Id: RunnableQueue.java,v 1.5 2006/04/21 06:35:50 st125089 Exp $
046: */
047: public final class RunnableQueue implements Runnable {
048: /**
049: * The queue is in the processes of running tasks.
050: */
051: public static final String RUNNING = "Running";
052:
053: /**
054: * The queue may still be running tasks but as soon as possible
055: * will go to SUSPENDED state.
056: */
057: public static final String SUSPENDING = "Suspending";
058:
059: /**
060: * The queue is no longer running any tasks and will not
061: * run any tasks until resumeExecution is called.
062: */
063: public static final String SUSPENDED = "Suspended";
064:
065: /**
066: * This queue has been interrupted
067: */
068: public static final String TERMINATED = "Terminated";
069:
070: /**
071: * The default RunnableQueue instance.
072: */
073: protected static RunnableQueue defaultQueue;
074:
075: /**
076: * The Suspension state of this thread.
077: */
078: protected String state;
079:
080: /**
081: * Object to synchronize/wait/notify for suspension
082: * issues.
083: */
084: protected Object stateLock = new Object();
085:
086: /**
087: * The Scheduler which can run Runnables at a fixed
088: * rate.
089: */
090: protected Scheduler scheduler = new Scheduler(this );
091:
092: /**
093: * The Runnable objects list, also used as synchoronization point
094: * for pushing/poping runables.
095: */
096: protected DoublyLinkedList list = new DoublyLinkedList();
097:
098: /**
099: * The object which handles RunnableQueue events.
100: */
101: protected RunnableQueueHandler queueHandler;
102:
103: /**
104: * The current thread.
105: */
106: protected Thread runnableQueueThread;
107:
108: /**
109: * Used for the RunnableQueue's thread names.
110: * @see #createRunnableQueue
111: */
112: private static int threadCount;
113:
114: /**
115: * All <code>RunnableQueue</code> instances should be created through
116: * the <code>createRunnableQueue</code> method.
117: *
118: * @see #createRunnableQueue
119: */
120: private RunnableQueue() {
121: }
122:
123: /**
124: * Returns the default <code>RunnableQueue</code> instance. This is what
125: * should be used in most circumstances. In particular, all document
126: * instances which need to process in a seperate thread should share this
127: * default RunnableQueue.
128: *
129: * @return the default <code>RunnableQueue</code> instance.
130: */
131: public static RunnableQueue getDefault() {
132: if (defaultQueue != null) {
133: return defaultQueue;
134: }
135:
136: defaultQueue = RunnableQueue
137: .createRunnableQueue(new VoidQueueHandler());
138: defaultQueue.resumeExecution();
139:
140: return defaultQueue;
141: }
142:
143: /**
144: * Creates a new RunnableQueue started in a new thread.
145: *
146: * @param queueHandler the <tt>RunnableQueueHandler</tt> which will be notified
147: * of the <tt>RunnableQueue</tt>'s activity. May be null.
148: * @return a RunnableQueue which is garanteed to have entered its
149: * <tt>run()</tt> method.
150: */
151: public static RunnableQueue createRunnableQueue(
152: RunnableQueueHandler queueHandler) {
153: RunnableQueue result = new RunnableQueue();
154:
155: // Configure the RunHandler
156: if (queueHandler == null) {
157: queueHandler = new VoidQueueHandler();
158: }
159: result.queueHandler = queueHandler;
160:
161: // Start the thread. We use the RunnableQueue instance as
162: // a lock to synchronize between this method and the
163: // run method (called from the RunnableQueue thread)
164: synchronized (result) {
165: Thread t = new Thread(result, "RunnableQueue-"
166: + threadCount++);
167: ThreadSupport.setDaemon(t, true);
168: t.setPriority(Thread.MIN_PRIORITY);
169: t.start();
170: while (result.getThread() == null) {
171: try {
172: // See the run() method. It calls notify on
173: // the RunnableQueue instance to notify us
174: // that the thread has started executing
175: result.wait();
176: } catch (InterruptedException ie) {
177: }
178: }
179: }
180:
181: // Wait until we get into suspended state. State changes
182: // are synchronized with the stateLock lock.
183: synchronized (result.stateLock) {
184: try {
185: while (result.state != SUSPENDED) {
186: result.stateLock.wait();
187: }
188: } catch (InterruptedException ie) {
189: }
190: }
191:
192: return result;
193: }
194:
195: /**
196: * Runs this queue. Implements the <code>Runnable</code> interface.
197: */
198: public void run() {
199: //
200: // This object is used as a lock to synchronize on the
201: // queue's thread execution start.
202: //
203: synchronized (this ) {
204: runnableQueueThread = Thread.currentThread();
205: // Wake the create method so it knows we are in
206: // our run and ready to go.
207: notify();
208: }
209:
210: Link l = null;
211: Runnable rable = null, sRable;
212: long t = 0;
213: long wait = 0;
214:
215: try {
216: while (!ThreadSupport.isInterrupted(Thread.currentThread())) {
217: // Mutex for suspending/resuming work.
218: synchronized (stateLock) {
219: if (state != RUNNING) {
220: state = SUSPENDED;
221:
222: // notify suspendExecution in case it is
223: // waiting til we shut down.
224: stateLock.notifyAll();
225:
226: queueHandler.executionSuspended(this );
227:
228: while (state != RUNNING) {
229: state = SUSPENDED;
230: // Wait until resumeExecution called.
231: stateLock.wait();
232: }
233:
234: // notify resumeExecution as it waits until
235: // execution are really resumed
236: stateLock.notifyAll();
237: queueHandler.executionResumed(this );
238: }
239: }
240:
241: // First, run the Scheduler to take care of all the pending
242: // fixed rate Runnables.
243: t = System.currentTimeMillis();
244: scheduler.run(t);
245:
246: synchronized (list) {
247: l = (Link) list.pop();
248: if (l == null) {
249: // Wait until the next scheduled runnable
250: wait = scheduler.nextRun(System
251: .currentTimeMillis());
252:
253: if (wait == 0) {
254: wait = 1;
255: }
256:
257: if (state == SUSPENDING) {
258: continue;
259: }
260:
261: if (wait > 0) {
262: list.wait(wait);
263: } else {
264: // There is no scheduled runnable at this point.
265: list.wait();
266: }
267:
268: continue; // start loop over again...
269: }
270:
271: rable = l.runnable;
272: }
273:
274: try {
275: rable.run();
276: } catch (Exception e) {
277: // Might be nice to notify someone directly.
278: // But this is more or less what Swing does.
279: e.printStackTrace();
280: }
281:
282: if (l.runHandler != null) {
283: l.runHandler.runnableInvoked(this , rable);
284: }
285:
286: l.unlock();
287: rable = null;
288: }
289: } catch (InterruptedException e) {
290: if (this == defaultQueue) {
291: defaultQueue = null;
292: }
293: e.printStackTrace();
294: } finally {
295: if (this == defaultQueue) {
296: defaultQueue = null;
297: }
298: System.err
299: .println(">>>>>>>>>>>>>> RunnableQueue terminating");
300: synchronized (this ) {
301: runnableQueueThread = null;
302: }
303: synchronized (stateLock) {
304: state = TERMINATED;
305: stateLock.notifyAll();
306: }
307: }
308: }
309:
310: /**
311: * Returns the thread in which the RunnableQueue is currently running.
312: * @return null if the RunnableQueue has not entered his
313: * <tt>run()</tt> method.
314: */
315: public Thread getThread() {
316: return runnableQueueThread;
317: }
318:
319: /**
320: * Removes all pending <code>Runnable</code>s.
321: */
322: public void empty() {
323: synchronized (list) {
324: list.empty();
325: }
326: }
327:
328: /**
329: * @return the number of pending runnables
330: */
331: public int getSize() {
332: synchronized (list) {
333: return list.getSize();
334: }
335: }
336:
337: /**
338: * @return the next pending runnable
339: */
340: public Runnable getNextPending() {
341: synchronized (list) {
342: if (list.getSize() == 0) {
343: return null;
344: } else {
345: return ((Link) list.getHead()).runnable;
346: }
347: }
348: }
349:
350: /**
351: * Schedules the input <code>Runnable</code> at the requested
352: * fixed rate. The <code>RunnableQueue</code> offers a 'best'
353: * effort service meaning that it will schedule the <code>Runnable</code>
354: * as soon as possible so that the time between the begining of
355: * two consecutive runs of the <code>Runnable</code> is as close
356: * as possible to the requested rate. Note that a too high rate
357: * may cause the rest of the <code>Runnable</code> in the
358: * <code>RunnableQueue</code> to be starved and never get
359: * executed.
360: *
361: * @param r the <code>Runnable</code> to schedule at a regular
362: * interval. If null, there won't be any <code>Runnable</code>
363: * scheduled and if there was a current one, it won't be executed
364: * any more.
365: * @param runHandler the <code>RunnableHandler</code> to notify
366: * once the <code>Runnable</code> has finished executing.
367: * Should not be null.
368: * @param interval the minimum interval between to consecutive
369: * executions of the input <code>Runnable</code>. The
370: * value is in milliseconds.
371: *
372: * @throws IllegalArgumentException If this parameter is zero or less,
373: * and <code>r</code> is not null.
374: *
375: */
376: public void scheduleAtFixedRate(final Runnable r,
377: final RunnableHandler runHandler, final long interval) {
378: scheduler.add(r, interval, runHandler);
379:
380: // In case the queue is running and waiting for an item in the
381: // list, notify the list so that we can get the animation loop
382: // going. See the run() method.
383: synchronized (list) {
384: list.notify();
385: }
386: }
387:
388: /**
389: * Removes the input <code>Runnable</code> from the list of
390: * Runnables scheduled at a fixed rate. If the Runnable is not
391: * currently scheduled at a fixed rate, then this method does
392: * nothing. If this Runnable was scheduled multiple times
393: * with this RunnableQueue, then all instances are removed.
394: *
395: * @param r the Runnable that should no longer be scheduled at a
396: * fixed rate.
397: * @see #scheduleAtFixedRate
398: */
399: public void unschedule(final Runnable r) {
400: scheduler.remove(r);
401: }
402:
403: /**
404: * Schedules the given Runnable object for a later invocation, and
405: * returns.
406: * An exception is thrown if the RunnableQueue was not started.
407: *
408: * @param r the <code>Runnable</code> to put at the end of the
409: * execution list.
410: * @param runHandler the <code>RunnableHandler</code> to notify
411: * once the <code>Runnable</code> has finished executing.
412: * Should not be null.
413: * @throws IllegalStateException if getThread() is null.
414: */
415: public void invokeLater(final Runnable r,
416: final RunnableHandler runHandler) {
417: if (runnableQueueThread == null) {
418: throw new IllegalStateException(
419: "RunnableQueue not started or has exited");
420: }
421:
422: synchronized (list) {
423: list.push(new Link(r, runHandler));
424: list.notify();
425: }
426: }
427:
428: /**
429: * Waits until the given Runnable's <tt>run()</tt> has returned.
430: * <em>Note: <tt>invokeAndWait()</tt> must not be called from the
431: * current thread (for example from the <tt>run()</tt> method of the
432: * argument).
433: *
434: * @param r the <code>Runnable</code> to put at the end of the
435: * execution list.
436: * @param runHandler the <code>RunnableHandler</code> to notify
437: * once the <code>Runnable</code> has finished executing.
438: * Should not be null.
439: * @throws IllegalStateException if getThread() is null or if the
440: * thread returned by getThread() is the current one.
441: * @throws InterruptedException if the thread is interrupted while
442: * waiting for the input <code>Runnable</code> to complete
443: * its execution.
444: */
445: public void invokeAndWait(final Runnable r,
446: final RunnableHandler runHandler)
447: throws InterruptedException {
448:
449: if (runnableQueueThread == null) {
450: throw new IllegalStateException(
451: "RunnableQueue not started or has exited");
452: }
453: if (runnableQueueThread == Thread.currentThread()) {
454: throw new IllegalStateException(
455: "Cannot be called from the RunnableQueue thread");
456: }
457:
458: LockableLink l = new LockableLink(r, runHandler);
459: synchronized (list) {
460: list.push(l);
461: list.notify();
462: }
463: l.lock();
464: }
465:
466: /**
467: * Waits until the given Runnable's <tt>run()</tt> has returned.
468: * <em>Note: <tt>safeInvokeAndWait()</tt> may be called from any thread.
469: * This method checks if this thread is the update thread, in which case
470: * the Runnable is invoked directly. Otherwise, it delegates to the
471: * invokeAndWait method.
472: *
473: * @param r the <code>Runnable</code> to put at the end of the
474: * execution list. Should not be null.
475: * @param runHandler the <code>RunnableHandler</code> to notify
476: * once the <code>Runnable</code> has finished executing.
477: * Should not be null.
478: * @throws IllegalStateException if getThread() is null or if the
479: * thread returned by getThread() is the current one.
480: */
481: public void safeInvokeAndWait(final Runnable r,
482: final RunnableHandler runHandler) {
483: if (runnableQueueThread == Thread.currentThread()) {
484: r.run();
485: runHandler.runnableInvoked(this , r);
486: }
487:
488: try {
489: invokeAndWait(r, runHandler);
490: } catch (InterruptedException ie) {
491: // We are in a bad state because the thread was interrupted while
492: // waiting for the runnable to complete.
493: throw new IllegalStateException();
494: }
495: }
496:
497: /**
498: * Schedules the given Runnable object for a later invocation, and
499: * returns. The given runnable preempts any runnable that is not
500: * currently executing (ie the next runnable started will be the
501: * one given). An exception is thrown if the RunnableQueue was
502: * not started.
503: *
504: * @param r the <code>Runnable</code> to put at the front of the
505: * execution list.
506: * @param runHandler the <code>RunnableHandler</code> to notify
507: * once the <code>Runnable</code> has finished executing.
508: * Should not be null.
509: * @throws IllegalStateException if getThread() is null.
510: */
511: public void preemptLater(final Runnable r,
512: final RunnableHandler runHandler) {
513: if (runnableQueueThread == null) {
514: throw new IllegalStateException(
515: "RunnableQueue not started or has exited");
516: }
517: synchronized (list) {
518: list.unpop(new Link(r, runHandler));
519: list.notify();
520: }
521: }
522:
523: /**
524: * Waits until the given Runnable's <tt>run()</tt> has returned.
525: * The given runnable preempts any runnable that is not currently
526: * executing (ie the next runnable started will be the one given).
527: * <em>Note: <tt>preemptAndWait()</tt> must not be called from the
528: * current thread (for example from the <tt>run()</tt> method of the
529: * argument).
530: *
531: * @param r the <code>Runnable</code> to execute
532: * @param runHandler the <code>RunnableHandler</code> to notify
533: * once the <code>Runnable</code> has finished executing.
534: * Should not be null.
535: * @throws IllegalStateException if getThread() is null or if the
536: * thread returned by getThread() is the current one.
537: * @throws InterruptedException if the thread is interrupted while
538: * waiting for the completion of the input <code>Runnable</code>
539: * to complete execution.
540: */
541: public void preemptAndWait(final Runnable r,
542: final RunnableHandler runHandler)
543: throws InterruptedException {
544:
545: if (runnableQueueThread == null) {
546: throw new IllegalStateException(
547: "RunnableQueue not started or has exited");
548: }
549: if (runnableQueueThread == Thread.currentThread()) {
550: throw new IllegalStateException(
551: "Cannot be called from the RunnableQueue thread");
552: }
553:
554: LockableLink l = new LockableLink(r, runHandler);
555: synchronized (list) {
556: list.unpop(l);
557: list.notify();
558: }
559: l.lock();
560: }
561:
562: /**
563: * @return this queue's state, one of RUNNING, SUSPENDING,
564: * SUSPENDED or TERMINATED
565: */
566: public String getQueueState() {
567: synchronized (stateLock) {
568: return state;
569: }
570: }
571:
572: /**
573: * Suspends the execution of this queue after the current runnable
574: * completes.
575: * @param waitTillSuspended if true this method will not return
576: * until the queue has suspended (no runnable in progress
577: * or about to be in progress). If resumeExecution is
578: * called while waiting will simply return (this really
579: * indicates a race condition in your code). This may
580: * return before an associated RunHandler is notified.
581: * @throws IllegalStateException if getThread() is null.
582: */
583: public void suspendExecution(final boolean waitTillSuspended) {
584: if (runnableQueueThread == null) {
585: throw new IllegalStateException(
586: "RunnableQueue not started or has exited");
587: }
588: synchronized (stateLock) {
589: if (state == SUSPENDED) {
590: // already suspended...
591: return;
592: }
593:
594: if (state == RUNNING) {
595: state = SUSPENDING;
596: synchronized (list) {
597: // Wake up run thread if it is waiting for jobs,
598: // so we go into the suspended case (notifying
599: // run-handler etc...)
600: list.notify();
601: }
602: }
603:
604: if (waitTillSuspended) {
605: try {
606: stateLock.wait();
607: } catch (InterruptedException ie) {
608: }
609: }
610: }
611: }
612:
613: /**
614: * Resumes the execution of this queue.
615: * @throws IllegalStateException if getThread() is null.
616: */
617: public void resumeExecution() {
618: if (runnableQueueThread == null) {
619: throw new IllegalStateException(
620: "RunnableQueue not started or has exited");
621: }
622:
623: synchronized (stateLock) {
624: if (state != RUNNING) {
625: state = RUNNING;
626: stateLock.notifyAll(); // wake it up.
627: try {
628: // Wait until we have really resumed
629: stateLock.wait();
630: } catch (InterruptedException ie) {
631: // The calling thread was interrupted
632: }
633: }
634: }
635: }
636:
637: /**
638: * This interface must be implemented by an object which wants to
639: * be notified of Runnable execution.
640: */
641: public interface RunnableHandler {
642: /**
643: * Called when the given Runnable has just been invoked and
644: * has returned.
645: *
646: * @param rq the <code>RunnableQueue</code> on which the
647: * <code>Runnable</code> was just invoked.
648: * @param r the <code>Runnable</code> that just
649: * executed.
650: */
651: void runnableInvoked(RunnableQueue rq, Runnable r);
652: }
653:
654: /**
655: * This interface must be implemented by an object which wants to
656: * be notified of the RunnableQueue's execution activity (resumed
657: * or suspended.
658: */
659: public interface RunnableQueueHandler {
660: /**
661: * Called when the execution of the queue has been suspended.
662: *
663: * @param rq the <code>RunnableQueue</code> whose execution was
664: * suspended.
665: */
666: void executionSuspended(RunnableQueue rq);
667:
668: /**
669: * Called when the execution of the queue has been resumed.
670: *
671: * @param rq the <code>RunnableQueue</code> whose execution has
672: * resumed.
673: */
674: void executionResumed(RunnableQueue rq);
675: }
676:
677: /**
678: * This implementation of the RunnableQueueHandler is used for the
679: * default RunnableQueue.
680: */
681: public static class VoidQueueHandler implements
682: RunnableQueueHandler {
683: /**
684: * Called when the execution of the queue has been suspended.
685: *
686: * @param rq the <code>RunnableQueue</code> whose execution was
687: * suspended.
688: */
689: public void executionSuspended(RunnableQueue rq) {
690: // Do nothing.
691: }
692:
693: /**
694: * Called when the execution of the queue has been resumed.
695: *
696: * @param rq the <code>RunnableQueue</code> whose execution has
697: * resumed.
698: */
699: public void executionResumed(RunnableQueue rq) {
700: // Do nothing.
701: }
702: }
703:
704: /**
705: * To store a Runnable.
706: */
707: protected static class Link extends DoublyLinkedList.Node {
708: /**
709: * The Runnable.
710: */
711: protected Runnable runnable;
712:
713: /**
714: * The RunnableHandler.
715: */
716: protected RunnableHandler runHandler;
717:
718: /**
719: * Creates a new link.
720: *
721: * @param r the <code>Runnable</code> this link is associated with.
722: * @param runHandler the <code>RunnableHandler</code> to notify when
723: * the <code>Runnable</code> has bee executed.
724: */
725: public Link(final Runnable r, final RunnableHandler runHandler) {
726: runnable = r;
727: this .runHandler = runHandler;
728: }
729:
730: /**
731: * unlock link and notify locker.
732: * Basic implementation does nothing.
733: *
734: * @throws InterruptedException if the unlocking thread is interrupted
735: * while waiting for a notification from the locking thread
736: */
737: public void unlock() throws InterruptedException {
738: return;
739: }
740: }
741:
742: /**
743: * To store a Runnable with an object waiting for him to be executed.
744: */
745: protected static class LockableLink extends Link {
746:
747: /**
748: * Whether this link is actually locked.
749: */
750: protected boolean locked;
751:
752: /**
753: * Creates a new link.
754: *
755: * @param r the link's associated <code>Runnable</code>
756: * @param runHandler the <code>RunnableHandler</code> to notify when
757: * the <code>Runnable</code> has bee executed.
758: */
759: public LockableLink(final Runnable r,
760: final RunnableHandler runHandler) {
761: super (r, runHandler);
762: }
763:
764: /**
765: * Locks this link.
766: *
767: * @throws InterruptedException if the thread is interrupted
768: * while waiting for a notification from the unlocking
769: * thread.
770: */
771: public synchronized void lock() throws InterruptedException {
772: locked = true;
773: notify();
774: wait();
775: }
776:
777: /**
778: * Unlocks this link.
779: *
780: * @throws InterruptedException if the thread is interrupted while
781: * waiting for the link to unlock
782: */
783: public synchronized void unlock() throws InterruptedException {
784: while (!locked) {
785: // Wait until lock is called...
786: wait();
787: }
788: // Wake the locking thread...
789: notify();
790: }
791: }
792: }
|