0001: /* Copyright (c) 2001-2005, The HSQL Development Group
0002: * All rights reserved.
0003: *
0004: * Redistribution and use in source and binary forms, with or without
0005: * modification, are permitted provided that the following conditions are met:
0006: *
0007: * Redistributions of source code must retain the above copyright notice, this
0008: * list of conditions and the following disclaimer.
0009: *
0010: * Redistributions in binary form must reproduce the above copyright notice,
0011: * this list of conditions and the following disclaimer in the documentation
0012: * and/or other materials provided with the distribution.
0013: *
0014: * Neither the name of the HSQL Development Group nor the names of its
0015: * contributors may be used to endorse or promote products derived from this
0016: * software without specific prior written permission.
0017: *
0018: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0019: * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0020: * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0021: * ARE DISCLAIMED. IN NO EVENT SHALL HSQL DEVELOPMENT GROUP, HSQLDB.ORG,
0022: * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
0023: * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
0024: * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
0025: * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
0026: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0027: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
0028: * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0029: */
0030:
0031: package org.hsqldb.lib;
0032:
0033: import java.util.Date;
0034:
0035: /**
0036: * Facility to schedule tasks for future execution in a background thread. <p>
0037: *
0038: * Tasks may be scheduled for one-time execution or for repeated execution at
0039: * regular intervals, using either fixed rate or fixed delay policy. <p>
0040: *
0041: * This class is a JDK 1.1 compatible implementation required by HSQLDB both
0042: * because the java.util.Timer class is available only in JDK 1.3+ and because
0043: * java.util.Timer starves least recently added tasks under high load and
0044: * fixed rate scheduling, especially when the average actual task duration is
0045: * greater than the average requested task periodicity. <p>
0046: *
0047: * An additional (minor) advantage over java.util.Timer is that this class does
0048: * not retain a live background thread during periods when the task queue is
0049: * empty.
0050: * @author boucherb@users
0051: * @version 1.8.0.3
0052: * @since 1.7.2
0053: */
0054: public final class HsqlTimer implements ObjectComparator, ThreadFactory {
0055:
0056: /** The priority queue for the scheduled tasks. */
0057: protected final TaskQueue taskQueue = new TaskQueue(16,
0058: (ObjectComparator) this );
0059:
0060: /** The inner runnable that executes tasks in the background thread. */
0061: protected final TaskRunner taskRunner = new TaskRunner();
0062:
0063: /** The background thread. */
0064: protected Thread taskRunnerThread;
0065:
0066: /** The factory that procduces the background threads. */
0067: protected final ThreadFactory threadFactory;
0068:
0069: /**
0070: * Whether this timer should disallow all further processing.
0071: *
0072: * Once set true, stays true forever.
0073: */
0074: protected volatile boolean isShutdown;
0075:
0076: /**
0077: * Constructs a new HsqlTimer using the default thread factory
0078: * implementation.
0079: */
0080: public HsqlTimer() {
0081: this (null);
0082: }
0083:
0084: /**
0085: * Constructs a new HsqlTimer.
0086: *
0087: * Uses the specified thread factory implementation.
0088: *
0089: * @param threadFactory the ThreadFactory used to produce this timer's
0090: * background threads. If null, the default implementation supplied
0091: * by this class will be used.
0092: */
0093: public HsqlTimer(final ThreadFactory threadFactory) {
0094: this .threadFactory = (threadFactory == null) ? this
0095: : threadFactory;
0096: }
0097:
0098: /**
0099: * Required to back the priority queue for scheduled tasks.
0100: *
0101: * @param a the first Task
0102: * @param b the second Task
0103: * @return 0 if equal, < 0 if a < b, > 0 if a > b
0104: */
0105: public int compare(final Object a, final Object b) {
0106:
0107: final long awhen = ((Task) (a)).getNextScheduled();
0108: final long bwhen = ((Task) (b)).getNextScheduled();
0109:
0110: return (awhen < bwhen) ? -1 : (awhen == bwhen) ? 0 : 1;
0111: }
0112:
0113: /**
0114: * Default ThreadFactory implementation. <p>
0115: *
0116: * Contructs a new Thread from the designated runnable, sets its
0117: * name to "HSQLDB Timer @" + Integer.toHexString(hashCode()),
0118: * and sets it as a daemon thread. <p>
0119: *
0120: * @param runnable used to construct the new Thread.
0121: * @return a new Thread constructed from the designated runnable.
0122: */
0123: public Thread newThread(final Runnable runnable) {
0124:
0125: final Thread thread = new Thread(runnable);
0126:
0127: thread.setName("HSQLDB Timer @"
0128: + Integer.toHexString(hashCode()));
0129: thread.setDaemon(true);
0130:
0131: return thread;
0132: }
0133:
0134: /**
0135: * Retrieves the background execution thread. <p>
0136: *
0137: * null is returned if there is no such thread. <p>
0138: *
0139: * @return the current background thread (may be null)
0140: */
0141: public synchronized Thread getThread() {
0142: return this .taskRunnerThread;
0143: }
0144:
0145: /**
0146: * (Re)starts background processing of the task queue.
0147: *
0148: * @throws IllegalStateException if this timer is shut down.
0149: * @see #shutdown()
0150: * @see #shutdownImmediately()
0151: */
0152: public synchronized void restart() throws IllegalStateException {
0153:
0154: if (this .isShutdown) {
0155: throw new IllegalStateException("isShutdown==true");
0156: } else if (this .taskRunnerThread == null) {
0157: this .taskRunnerThread = this .threadFactory
0158: .newThread(this .taskRunner);
0159:
0160: this .taskRunnerThread.start();
0161: } else {
0162: this .taskQueue.unpark();
0163: }
0164: }
0165:
0166: /**
0167: * Causes the specified Runnable to be executed once in the background
0168: * after the specified delay.
0169: *
0170: * @param delay in milliseconds
0171: * @param runnable the Runnable to execute.
0172: * @return opaque reference to the internal task
0173: * @throws IllegalArgumentException if runnable is null
0174: */
0175: public Object scheduleAfter(final long delay,
0176: final Runnable runnable) throws IllegalArgumentException {
0177:
0178: if (runnable == null) {
0179: throw new IllegalArgumentException("runnable == null");
0180: }
0181:
0182: return this .addTask(now() + delay, runnable, 0, false);
0183: }
0184:
0185: /**
0186: * Causes the specified Runnable to be executed once in the background
0187: * at the specified time.
0188: *
0189: * @param date time at which to execute the specified Runnable
0190: * @param runnable the Runnable to execute.
0191: * @return opaque reference to the internal task
0192: * @throws IllegalArgumentException if date or runnable is null
0193: */
0194: public Object scheduleAt(final Date date, final Runnable runnable)
0195: throws IllegalArgumentException {
0196:
0197: if (date == null) {
0198: throw new IllegalArgumentException("date == null");
0199: } else if (runnable == null) {
0200: throw new IllegalArgumentException("runnable == null");
0201: }
0202:
0203: return this .addTask(date.getTime(), runnable, 0, false);
0204: }
0205:
0206: /**
0207: * Causes the specified Runnable to be executed periodically in the
0208: * background, starting at the specified time.
0209: *
0210: * @return opaque reference to the internal task
0211: * @param period the cycle period
0212: * @param relative if true, fixed rate sheduling else fixed delay scheduling
0213: * @param date time at which to execute the specified Runnable
0214: * @param runnable the Runnable to execute
0215: * @throws IllegalArgumentException if date or runnable is null, or
0216: * period is <= 0
0217: */
0218: public Object schedulePeriodicallyAt(final Date date,
0219: final long period, final Runnable runnable,
0220: final boolean relative) throws IllegalArgumentException {
0221:
0222: if (date == null) {
0223: throw new IllegalArgumentException("date == null");
0224: } else if (period <= 0) {
0225: throw new IllegalArgumentException("period <= 0");
0226: } else if (runnable == null) {
0227: throw new IllegalArgumentException("runnable == null");
0228: }
0229:
0230: return addTask(date.getTime(), runnable, period, relative);
0231: }
0232:
0233: /**
0234: * Causes the specified Runnable to be executed periodically in the
0235: * background, starting after the specified delay.
0236: *
0237: * @return opaque reference to the internal task
0238: * @param period the cycle period
0239: * @param relative if true, fixed rate sheduling else fixed delay scheduling
0240: * @param delay in milliseconds
0241: * @param runnable the Runnable to execute.
0242: * @throws IllegalArgumentException if runnable is null or period is <= 0
0243: */
0244: public Object schedulePeriodicallyAfter(final long delay,
0245: final long period, final Runnable runnable,
0246: final boolean relative) throws IllegalArgumentException {
0247:
0248: if (period <= 0) {
0249: throw new IllegalArgumentException("period <= 0");
0250: } else if (runnable == null) {
0251: throw new IllegalArgumentException("runnable == null");
0252: }
0253:
0254: return addTask(now() + delay, runnable, period, relative);
0255: }
0256:
0257: /**
0258: * Shuts down this timer after the current task (if any) completes. <p>
0259: *
0260: * After this call, the timer has permanently entered the shutdown state;
0261: * attempting to schedule any new task or directly restart this timer will
0262: * result in an IllegalStateException. <p>
0263: *
0264: */
0265: public synchronized void shutdown() {
0266:
0267: if (!this .isShutdown) {
0268: this .isShutdown = true;
0269:
0270: this .taskQueue.cancelAllTasks();
0271: }
0272: }
0273:
0274: /**
0275: * Shuts down this timer immediately, interrupting the wait state associated
0276: * with the current head of the task queue or the wait state internal to
0277: * the currently executing task, if any such state is currently in effect.
0278: *
0279: * After this call, the timer has permanently entered the shutdown state;
0280: * attempting to schedule any new task or directly restart this timer will
0281: * result in an IllegalStateException. <p>
0282: *
0283: * <b>Note:</b> If the integrity of work performed by a scheduled task
0284: * may be adversely affected by an unplanned interruption, it is the
0285: * responsibility of the task's implementation to deal correctly with the
0286: * possibility that this method is called while such work is in progress,
0287: * for instance by catching the InterruptedException, completing the work,
0288: * and then rethrowing the exception.
0289: */
0290: public synchronized void shutdownImmediately() {
0291:
0292: if (!this .isShutdown) {
0293: final Thread runner = this .taskRunnerThread;
0294:
0295: this .isShutdown = true;
0296:
0297: if (runner != null && runner.isAlive()) {
0298: runner.interrupt();
0299: }
0300: ;
0301:
0302: this .taskQueue.cancelAllTasks();
0303: }
0304: }
0305:
0306: /**
0307: * Causes the task referenced by the supplied argument to be cancelled.
0308: * If the referenced task is currently executing, it will continue until
0309: * finished but will not be rescheduled.
0310: *
0311: * @param task a task reference
0312: */
0313: public static void cancel(final Object task) {
0314:
0315: if (task instanceof Task) {
0316: ((Task) task).cancel();
0317: }
0318: }
0319:
0320: /**
0321: * Retrieves whether the specified argument references a cancelled task.
0322: *
0323: * @param task a task reference
0324: * @return true if referenced task is cancelled
0325: */
0326: public static boolean isCancelled(final Object task) {
0327: return (task instanceof Task) ? ((Task) task).isCancelled()
0328: : true;
0329: }
0330:
0331: /**
0332: * Retrieves whether the specified argument references a task scheduled
0333: * periodically using fixed rate scheduling.
0334: *
0335: * @param task a task reference
0336: * @return true if the task is scheduled at a fixed rate
0337: */
0338: public static boolean isFixedRate(final Object task) {
0339:
0340: if (task instanceof Task) {
0341: final Task ltask = (Task) task;
0342:
0343: return (ltask.relative && ltask.period > 0);
0344: } else {
0345: return false;
0346: }
0347: }
0348:
0349: /**
0350: * Retrieves whether the specified argument references a task scheduled
0351: * periodically using fixed delay scheduling.
0352: *
0353: * @param task a task reference
0354: * @return true if the reference is scheduled using a fixed delay
0355: */
0356: public static boolean isFixedDelay(final Object task) {
0357:
0358: if (task instanceof Task) {
0359: final Task ltask = (Task) task;
0360:
0361: return (!ltask.relative && ltask.period > 0);
0362: } else {
0363: return false;
0364: }
0365: }
0366:
0367: /**
0368: * Retrieves whether the specified argument references a task scheduled
0369: * for periodic execution.
0370: *
0371: * @param task a task reference
0372: * @return true if the task is scheduled for periodic execution
0373: */
0374: public static boolean isPeriodic(final Object task) {
0375: return (task instanceof Task) ? (((Task) task).period > 0)
0376: : false;
0377: }
0378:
0379: /**
0380: * Retrieves the last time the referenced task was executed, as a
0381: * Date object. If the task has never been executed, null is returned.
0382: *
0383: * @param task a task reference
0384: * @return the last time the referenced task was executed; null if never
0385: */
0386: public static Date getLastScheduled(Object task) {
0387:
0388: if (task instanceof Task) {
0389: final Task ltask = (Task) task;
0390: final long last = ltask.getLastScheduled();
0391:
0392: return (last == 0) ? null : new Date(last);
0393: } else {
0394: return null;
0395: }
0396: }
0397:
0398: /**
0399: * Sets the periodicity of the designated task to a new value. <p>
0400: *
0401: * If the designated task is cancelled or the new period is identical to the
0402: * task's current period, then this invocation has essentially no effect
0403: * and the submitted object is returned. <p>
0404: *
0405: * Otherwise, if the new period is greater than the designated task's
0406: * current period, then a simple assignment occurs and the submittted
0407: * object is returned. <p>
0408: *
0409: * If neither case holds, then the designated task is cancelled and a new,
0410: * equivalent task with the new period is scheduled for immediate first
0411: * execution and returned to the caller. <p>
0412: *
0413: * @return a task reference, as per the rules stated above.
0414: * @param task the task whose periodicity is to be set
0415: * @param period the new period
0416: */
0417: public static Object setPeriod(final Object task, final long period) {
0418: return (task instanceof Task) ? ((Task) task).setPeriod(period)
0419: : task;
0420: }
0421:
0422: /**
0423: * Retrieves the next time the referenced task is due to be executed, as a
0424: * Date object. If the referenced task is cancelled, null is returned.
0425: *
0426: * @param task a task reference
0427: * @return the next time the referenced task is due to be executed
0428: */
0429: public static Date getNextScheduled(Object task) {
0430:
0431: if (task instanceof Task) {
0432: final Task ltask = (Task) task;
0433: final long next = ltask.isCancelled() ? 0 : ltask
0434: .getNextScheduled();
0435:
0436: return next == 0 ? null : new Date(next);
0437: } else {
0438: return null;
0439: }
0440: }
0441:
0442: /**
0443: * Adds to the task queue a new Task object encapsulating the supplied
0444: * Runnable and scheduling arguments.
0445: *
0446: * @param first the time of the task's first execution
0447: * @param runnable the Runnable to execute
0448: * @param period the task's periodicity
0449: * @param relative if true, use fixed rate else use fixed delay scheduling
0450: * @return an opaque reference to the internal task
0451: */
0452: protected Task addTask(final long first, final Runnable runnable,
0453: final long period, boolean relative) {
0454:
0455: if (this .isShutdown) {
0456: throw new IllegalStateException("shutdown");
0457: }
0458:
0459: final Task task = new Task(first, runnable, period, relative);
0460:
0461: // sychronized
0462: this .taskQueue.addTask(task);
0463:
0464: // sychronized
0465: this .restart();
0466:
0467: return task;
0468: }
0469:
0470: /** Sets the background thread to null. */
0471: protected synchronized void clearThread() {
0472: taskRunnerThread = null;
0473: }
0474:
0475: /**
0476: * Retrieves the next task to execute, or null if this timer is shutdown,
0477: * the current thread is interrupted, or there are no queued tasks.
0478: *
0479: * @return the next task to execute, or null
0480: */
0481: protected Task nextTask() {
0482:
0483: try {
0484: while (!this .isShutdown || Thread.interrupted()) {
0485: long now;
0486: long next;
0487: long wait;
0488: Task task;
0489:
0490: // synchronized to ensure removeTask
0491: // applies only to the peeked task,
0492: // when the computed wait <= 0
0493: synchronized (this .taskQueue) {
0494: task = this .taskQueue.peekTask();
0495:
0496: if (task == null) {
0497:
0498: // queue is empty
0499: break;
0500: }
0501:
0502: now = System.currentTimeMillis();
0503: next = task.next;
0504: wait = (next - now);
0505:
0506: if (wait > 0) {
0507:
0508: // release ownership of taskQueue monitor and await
0509: // notification of task addition or cancellation,
0510: // at most until the time when the peeked task is
0511: // next supposed to execute
0512: this .taskQueue.park(wait);
0513:
0514: continue; // to top of loop
0515: } else {
0516: this .taskQueue.removeTask();
0517: }
0518: }
0519:
0520: long period = task.period;
0521:
0522: if (period > 0) { // repeated task
0523: if (task.relative) { // using fixed rate shceduling
0524: final long late = (now - next);
0525:
0526: if (late > period) {
0527:
0528: // ensure that really late tasks don't
0529: // completely saturate the head of the
0530: // task queue
0531: period = 0; // TODO: is -1, -2 ... fairer?
0532: } else if (late > 0) {
0533:
0534: // compensate for scheduling overruns
0535: period -= late;
0536: }
0537: }
0538:
0539: task.updateSchedule(now, now + period);
0540: this .taskQueue.addTask(task);
0541: }
0542:
0543: return task;
0544: }
0545: } catch (InterruptedException e) {
0546:
0547: //e.printStackTrace();
0548: }
0549:
0550: return null;
0551: }
0552:
0553: /**
0554: * stats var
0555: */
0556: static int nowCount = 0;
0557:
0558: /**
0559: * Convenience method replacing the longer incantation:
0560: * System.currentTimeMillis()
0561: *
0562: * @return System.currentTimeMillis()
0563: */
0564: private static long now() {
0565:
0566: nowCount++;
0567:
0568: return System.currentTimeMillis();
0569: }
0570:
0571: /**
0572: * The Runnable that the background thread uses to execute
0573: * scheduled tasks. <p>
0574: *
0575: * <b>Note:</b> Outer class could simply implement Runnable,
0576: * but using an inner class protects the public run method
0577: * from potential abuse.
0578: */
0579: protected class TaskRunner implements Runnable {
0580:
0581: /**
0582: * Runs the next available task in the background thread. <p>
0583: *
0584: * When there are no available tasks, the background
0585: * thread dies and its instance field is cleared until
0586: * tasks once again become available.
0587: */
0588: public void run() {
0589:
0590: try {
0591: do {
0592: final Task task = HsqlTimer.this .nextTask();
0593:
0594: if (task == null) {
0595: break;
0596: }
0597:
0598: // PROBLEM: If the runnable throws an exception other
0599: // than InterruptedException (which likely stems
0600: // naturally from calling shutdownImmediately()
0601: // or getThread().interrupt()), this will still
0602: // cause the loop to exit, which is to say that
0603: // task scheduling will stop until a new task is
0604: // added or the timer is restarted directly, even
0605: // though there may still be uncancelled tasks
0606: // left on the queue.
0607: //
0608: // TODO: Clarify and establish a contract regarding
0609: // the difference between InterruptedException,
0610: // RuntimeException and other things, like
0611: // UndeclaredThrowableException.
0612: //
0613: // SOL'N: At present, we simply require each runnable to
0614: // understand its part of the implicit contract,
0615: // which is to deal with exceptions internally
0616: // (not throw them up to the timer), with the
0617: // possible exception of InterruptedException.
0618: //
0619: // If the integrity of work performed by the
0620: // runnable may be adversely affected by an
0621: // unplanned interruption, the runnable should
0622: // deal with this directly, for instance by
0623: // catching the InterruptedException, ensuring
0624: // that some integrity preserving state is
0625: // attained, and then rethrowing the exception.
0626: task.runnable.run();
0627: } while (true);
0628: } finally {
0629: HsqlTimer.this .clearThread();
0630: }
0631: }
0632: }
0633:
0634: /**
0635: * Encapsulates a Runnable and its scheduling attributes.
0636: *
0637: * Essentially, a wrapper class used to schedule a Runnable object
0638: * for execution by the enclosing HsqlTimer's TaskRunner in a
0639: * background thread.
0640: */
0641: protected class Task {
0642:
0643: /** What to run. */
0644: Runnable runnable;
0645:
0646: /** The periodic interval, or 0 if one-shot. */
0647: long period;
0648:
0649: /** The time this task was last executed, or 0 if never. */
0650: long last;
0651:
0652: /** The next time this task is scheduled to execute. */
0653: long next;
0654:
0655: /**
0656: * Whether to silently remove this task instead of running it,
0657: * the next time (if ever) it makes its way to the head of the
0658: * timer queue.
0659: */
0660: boolean cancelled = false;
0661:
0662: /** Serializes concurrent access to the cancelled field. */
0663: private Object cancel_mutex = new Object();
0664:
0665: /**
0666: * Scheduling policy flag. <p>
0667: *
0668: * When true, scheduling is fixed rate (as opposed to fixed delay),
0669: * and schedule updates are calculated relative to when the task was
0670: * was last run rather than a fixed delay starting from the current
0671: * wall-clock time provided by System.currentTimeMillis(). <p>
0672: *
0673: * This helps normalize scheduling for tasks that must attempt to
0674: * maintain a fixed rate of execution.
0675: */
0676: final boolean relative;
0677:
0678: /**
0679: * Constructs a new Task object encapulating the specified Runnable
0680: * and scheduling arguments.
0681: *
0682: * @param first the first time to execute
0683: * @param runnable the Runnable to execute
0684: * @param period the periodicity of execution
0685: * @param relative if true, use fixed rate scheduling else fixed delay
0686: */
0687: Task(final long first, final Runnable runnable,
0688: final long period, final boolean relative) {
0689:
0690: this .next = first;
0691: this .runnable = runnable;
0692: this .period = period;
0693: this .relative = relative;
0694: }
0695:
0696: /** Sets this task's cancelled flag true and signals its taskQueue. */
0697: void cancel() {
0698:
0699: synchronized (cancel_mutex) {
0700: if (!cancelled) {
0701: cancelled = true;
0702:
0703: HsqlTimer.this .taskQueue.signalTaskCancelled(this );
0704: }
0705: }
0706: }
0707:
0708: /**
0709: * Retrieves whether this task is cancelled.
0710: *
0711: * @return true if cancelled, else false
0712: */
0713: boolean isCancelled() {
0714:
0715: synchronized (cancel_mutex) {
0716: return cancelled;
0717: }
0718: }
0719:
0720: /**
0721: * Retrieves the instant in time just before this task was
0722: * last executed by the background thread. A value of zero
0723: * indicates that this task has never been executed.
0724: *
0725: * @return the last time this task was executed or zero if never
0726: */
0727: synchronized long getLastScheduled() {
0728: return last;
0729: }
0730:
0731: /**
0732: * Retrieves the time at which this task is next scheduled for
0733: * execution.
0734: *
0735: * @return the time at which this task is next scheduled for
0736: * execution
0737: */
0738: synchronized long getNextScheduled() {
0739: return next;
0740: }
0741:
0742: /**
0743: * Updates the last and next scheduled execution times.
0744: *
0745: * @param last when this task was last executed
0746: * @param next when this task is to be next executed
0747: */
0748: synchronized void updateSchedule(final long last,
0749: final long next) {
0750: this .last = last;
0751: this .next = next;
0752: }
0753:
0754: /**
0755: * Sets the new periodicity of this task in milliseconds. <p>
0756: *
0757: * If this task is cancelled or the new period is identical to the
0758: * current period, then this invocation has essentailly no effect
0759: * and this object is returned. <p>
0760: *
0761: * Otherwise, if the new period is greater than the current period, then
0762: * a simple field assignment occurs and this object is returned. <p>
0763: *
0764: * If none of the previous cases hold, then this task is cancelled and
0765: * a new, equivalent task with the new period is scheduled for
0766: * immediate first execution and returned to the caller. <p>
0767: *
0768: * @param period the new period
0769: * @return a task reference, as per the rules stated above.
0770: */
0771: synchronized Object setPeriod(final long newPeriod) {
0772:
0773: if (this .period == newPeriod || this .isCancelled()) {
0774: return this ;
0775: } else if (newPeriod > this .period) {
0776: this .period = newPeriod;
0777:
0778: return this ;
0779: } else {
0780: this .cancel();
0781:
0782: return HsqlTimer.this .addTask(now(), this .runnable,
0783: newPeriod, this .relative);
0784: }
0785: }
0786: }
0787:
0788: /**
0789: * Heap-based priority queue.
0790: *
0791: * Provides extensions to facilitate and simplify implementing
0792: * timer functionality.
0793: */
0794: protected static class TaskQueue extends HsqlArrayHeap {
0795:
0796: /**
0797: * Constructs a new TaskQueue with the specified initial capacity and
0798: * ObjectComparator.
0799: *
0800: * @param capacity the initial capacity of the queue
0801: * @param oc The ObjectComparator this queue uses to maintain its
0802: * Heap invariant.
0803: */
0804: TaskQueue(final int capacity, final ObjectComparator oc) {
0805: super (capacity, oc);
0806: }
0807:
0808: /**
0809: * Type-safe add method. <p>
0810: *
0811: * Can be used to inject debugging or accounting behaviour. <p>
0812: *
0813: * @param task the task to add
0814: */
0815: void addTask(final Task task) {
0816:
0817: // System.out.println("task added: " + task);
0818: super .add(task);
0819: }
0820:
0821: /**
0822: * Atomically removes all tasks in this queue and then and cancels
0823: * them.
0824: */
0825: void cancelAllTasks() {
0826:
0827: Object[] oldHeap;
0828: int oldCount;
0829:
0830: synchronized (this ) {
0831: oldHeap = this .heap;
0832: oldCount = this .count;
0833:
0834: // 1 instead of 0 to avoid unintended aoob exceptions
0835: this .heap = new Object[1];
0836: this .count = 0;
0837: }
0838:
0839: for (int i = 0; i < oldCount; i++) {
0840: ((Task) oldHeap[i]).cancelled = true;
0841: }
0842: }
0843:
0844: /**
0845: * Causes the calling thread to wait until another thread invokes
0846: * {@link #unpark() unpark} or the specified amount of time has
0847: * elapsed.
0848: *
0849: * Implements the sync & wait(n) half of this queue's availability
0850: * condition. <p>
0851: *
0852: * @param timeout the maximum time to wait in milliseconds.
0853: * @throws java.lang.InterruptedException if another thread has
0854: * interrupted the current thread. The <i>interrupted status</i> of
0855: * the current thread is cleared when this exception is thrown.
0856: */
0857: synchronized void park(final long timeout)
0858: throws InterruptedException {
0859: this .wait(timeout);
0860: }
0861:
0862: /**
0863: * Retrieves the head of this queue, without removing it. <p>
0864: *
0865: * This method has the side-effect of removing tasks from the
0866: * head of this queue until a non-cancelled task is encountered
0867: * or this queue is empty. <p>
0868: *
0869: * If this queue is initially empty or is emptied in the process
0870: * of finding the earliest scheduled non-cancelled task,
0871: * then null is returned. <p>
0872: *
0873: * @return the earliest scheduled non-cancelled task, or null if no such
0874: * task exists
0875: */
0876: synchronized Task peekTask() {
0877:
0878: while (super .heap[0] != null
0879: && ((Task) super .heap[0]).isCancelled()) {
0880: super .remove();
0881: }
0882:
0883: return (Task) super .heap[0];
0884: }
0885:
0886: /**
0887: * Informs this queue that the given task is supposedly cancelled. <p>
0888: *
0889: * If the indicated task is identical to the current head of
0890: * this queue, then it is removed and this queue is
0891: * {@link #unpark() unparked}. <p>
0892: *
0893: * The cancelled status of the given task is not verified; it is
0894: * assumed that the caller is well-behaved (always passes a
0895: * non-null reference to a cancelled task).
0896: *
0897: * @param task a supposedly cancelled task
0898: */
0899: synchronized void signalTaskCancelled(Task task) {
0900:
0901: // We only care about the case where HsqlTimer.nextTask
0902: // might be parked momentarily on this task.
0903: if (task == super .heap[0]) {
0904: super .remove();
0905: this .notify();
0906: }
0907: }
0908:
0909: /**
0910: * Type-safe remove method. <p>
0911: *
0912: * Removes the head task from this queue. <p>
0913: *
0914: * Can be used to inject debugging or accounting behaviour. <p>
0915: *
0916: * @return this queue's head task or null if no such task exists
0917: */
0918: Task removeTask() {
0919:
0920: // System.out.println("removing task...");
0921: return (Task) super .remove();
0922: }
0923:
0924: /**
0925: * Wakes up a single thread (if any) that is waiting on this queue's
0926: * {@link #park(long) park} method.
0927: *
0928: * Implements the sync & notify half of this queue's availability
0929: * condition.
0930: */
0931: synchronized void unpark() {
0932: this .notify();
0933: }
0934: }
0935:
0936: // ---------------------------------- tests ------------------------------------
0937: // /**
0938: // * Computes the system-specific average {@link java.io.FileDescriptor#sync()
0939: // * sync} time. <p>
0940: // *
0941: // * @param runs iterations to perform when computing the average
0942: // * @param buff the data to write before each sync call
0943: // * @return the total time to write buff and call sync runs times,
0944: // * divided by runs
0945: // */
0946: // static double avgSyncTime(int runs, byte[] buff) {
0947: // java.io.File file = null;
0948: // java.io.FileOutputStream fos;
0949: // java.io.FileDescriptor fd;
0950: // long start = System.currentTimeMillis();
0951: //
0952: // try {
0953: // file = java.io.File.createTempFile("SyncTest", ".tmp");
0954: // fos = new java.io.FileOutputStream(file);
0955: // fd = fos.getFD();
0956: //
0957: // for (int i = 0; i < runs; i++) {
0958: // fos.write(buff);
0959: // fos.flush();
0960: // fd.sync();
0961: // }
0962: //
0963: // long elapsed = System.currentTimeMillis() - start;
0964: //
0965: // return (elapsed/(double)runs);
0966: // } catch (Exception e) {
0967: // throw new RuntimeException(e);
0968: // } finally {
0969: // if (file != null) {
0970: // file.delete();
0971: // }
0972: // }
0973: // }
0974: //
0975: // /**
0976: // * WRITE_DELAY simulation task.
0977: // *
0978: // * Writes a given buffer to disk, sync's the associated file
0979: // * descriptor and maintains an account of the average period
0980: // * between executions.
0981: // */
0982: // static class WriteAndSyncTask extends java.util.TimerTask {
0983: // // static
0984: // /** Used to make the name of each task unique. */
0985: // static int serial;
0986: // /** The data to write. */
0987: // static final byte[] buf = new byte[256];
0988: //
0989: // // instance
0990: // /** Identifes this task. */
0991: // String name;
0992: // /** The time at which this task was last executed. */
0993: // long last;
0994: // /** A running sum of the periods between executions. */
0995: // long total;
0996: // /** The number of times this task has been executed. */
0997: // int runs;
0998: // /** True until this task is the first time. */
0999: // boolean firstTime = true;
1000: // /** The file to write. */
1001: // java.io.File file;
1002: // /** The FileOutputStream to write. */
1003: // java.io.FileOutputStream fos;
1004: // /** The FileDescriptor to sync. */
1005: // java.io.FileDescriptor fd;
1006: //
1007: // /** Constructs a new WriteAndSyncTask */
1008: // WriteAndSyncTask() {
1009: // this.name = "Task." + serial++;
1010: //
1011: // try {
1012: // this.file = java.io.File.createTempFile(name, ".tmp");
1013: // this.fos = new java.io.FileOutputStream(file);
1014: // this.fd = fos.getFD();
1015: // } catch(java.io.IOException ioe) {
1016: // throw new RuntimeException(ioe);
1017: // }
1018: // }
1019: //
1020: // /**
1021: // * Runnable implementation. <p>
1022: // *
1023: // * Does the average period accounting and
1024: // * invokes the writeAndSync method.
1025: // */
1026: // public void run() {
1027: // final long now = System.currentTimeMillis();
1028: //
1029: // if (this.firstTime) {
1030: // this.firstTime = false;
1031: // } else {
1032: // this.total += (now - this.last);
1033: // }
1034: //
1035: // this.last = now;
1036: //
1037: // writeAndSync();
1038: //
1039: // this.runs++;
1040: // }
1041: //
1042: // /**
1043: // * Writes a given buffer to disk and syncs the associated file
1044: // * descriptor.
1045: // */
1046: // void writeAndSync() {
1047: // try {
1048: // this.fos.write(buf);
1049: // this.fos.flush();
1050: // this.fd.sync();
1051: // Thread.sleep(1);
1052: // } catch(Exception e) {
1053: // e.printStackTrace();
1054: // }
1055: // }
1056: //
1057: // /**
1058: // * Closes the FileOutputStream, deletes the file
1059: // * and nullifies Object fields.
1060: // */
1061: // public void release() {
1062: // try {
1063: // this.fos.close();
1064: // } catch(Exception e) {
1065: // e.printStackTrace();
1066: // }
1067: // try {
1068: // this.file.delete();
1069: // } catch (Exception e) {
1070: // e.printStackTrace();
1071: // }
1072: //
1073: // this.fos = null;
1074: // this.file = null;
1075: // this.fd = null;
1076: // }
1077: //
1078: // /**
1079: // * Retrieves the computed moment of actual average periodicity
1080: // * experienced by this task.
1081: // */
1082: // public float getAveragePeriod() {
1083: // return (this.runs < 2) ? Float.NaN
1084: // : (this.total/(float)(this.runs - 1));
1085: // }
1086: //
1087: //
1088: // /**
1089: // * @return the String representation of this task, indicating
1090: // * its name, the number of runs so far and the
1091: // * computed moment of actual average periodicity
1092: // * experienced so far.
1093: // */
1094: // public String toString() {
1095: // return this.name
1096: // + "["
1097: // + "runs: " + runs + ", "
1098: // + "actual avg. period: " + getAveragePeriod()
1099: // + "]";
1100: // }
1101: // }
1102: //
1103: // static class Stats {
1104: // double min;
1105: // double max;
1106: // double pk;
1107: // double sk;
1108: // double vk;
1109: // long n;
1110: // boolean initialized;
1111: // boolean sample;
1112: //
1113: // void addDataPoint(double x) {
1114: //
1115: // double xi;
1116: // double xsi;
1117: // long nm1;
1118: //
1119: // xi = x;
1120: //
1121: // if (!initialized) {
1122: // n = 1;
1123: // pk = xi;
1124: // sk = xi;
1125: // min = xi;
1126: // max = xi;
1127: // vk = 0.0;
1128: // initialized = true;
1129: //
1130: // return;
1131: // }
1132: //
1133: // n++;
1134: //
1135: // nm1 = (n - 1);
1136: // xsi = (sk - (xi * nm1));
1137: // vk += ((xsi * xsi) / n) / nm1;
1138: // sk += xi;
1139: //
1140: // if (xi != 0) {
1141: // pk *= xi;
1142: // }
1143: //
1144: // max = Math.max(max, xi);
1145: // min = Math.min(min, xi);
1146: // }
1147: //
1148: // double getMin() {
1149: // return initialized ? min : Double.NaN;
1150: // }
1151: //
1152: // double getMax() {
1153: // return initialized ? max : Double.NaN;
1154: // }
1155: //
1156: // double getGeometricMean() {
1157: // return initialized ? Math.pow(pk, 1/(double)n) : Double.NaN;
1158: // }
1159: //
1160: // double getVariance() {
1161: //
1162: // if (!initialized) {
1163: // return Double.NaN;
1164: // }
1165: //
1166: // return sample ? (n == 1) ? Double.NaN
1167: // : (vk / (double) (n - 1))
1168: // : (vk / (double) (n));
1169: // }
1170: //
1171: // double getStdDev() {
1172: //
1173: // if (!initialized) {
1174: // return Double.NaN;
1175: // }
1176: //
1177: // return sample ? (n == 1) ? Double.NaN
1178: // : (Math.sqrt(vk
1179: // / (double) (n - 1)))
1180: // : (Math.sqrt(vk / (double) (n)));
1181: // }
1182: // }
1183: //
1184: // /**
1185: // * Runs the HsqlTimer tests.
1186: // * @param args Currently unused
1187: // */
1188: // public static void main(String[] args) {
1189: // // number of tasks to queue
1190: // int taskCount = 10;
1191: // // period, as a multiple of computed system-specific avg. sync time
1192: // double periodMultiplier = 1.4D;
1193: // // how long to run the timer, in milliseconds
1194: // long duration = 2800;
1195: //
1196: // test(taskCount, periodMultiplier, duration);
1197: // }
1198: //
1199: // /**
1200: // * Runs the HsqlTimer and java.util.Timer tests using the given
1201: // * arguments. <p>
1202: // *
1203: // * @param taskCount the number of WriteAndSync tasks to add
1204: // * @param periodMultiplier the period with with to schedule
1205: // * the tasks, as a multiple of the computed, system-specific
1206: // * average sync time.
1207: // * @param duration The number of milliseconds that the foreground
1208: // * Thread should sleep while the specified number of WriteAndSync
1209: // * tasks are running in the background thread
1210: // */
1211: // public static void test(final int taskCount,
1212: // final double periodMultiplier,
1213: // final long duration) {
1214: //
1215: // System.out.println();
1216: // System.out.println("****************************************");
1217: // System.out.println("* org.hsqldb.lib.HsqlTimer tests *");
1218: // System.out.println("****************************************");
1219: // System.out.println();
1220: //
1221: // System.out.println("Computing system-specific avg. sync time.");
1222: // System.out.println("Please wait...");
1223: //
1224: // double avgSyncTime = avgSyncTime(500, new byte[256]);
1225: // double minAvgPeriod = (taskCount * avgSyncTime);
1226: // long period = Math.round(avgSyncTime * periodMultiplier);
1227: //
1228: // System.out.println();
1229: // System.out.println("System-specific avg. sync time : " + avgSyncTime + " ms.");
1230: // System.out.println("Requested task count : " + taskCount);
1231: // System.out.println("Requested task period : " + period + " ms." );
1232: // System.out.println("Min. avg. period (0 starved) : " + minAvgPeriod + " ms." );
1233: // System.out.println("Requested test duration : " + duration + " ms.");
1234: //
1235: // if (period <= minAvgPeriod || minAvgPeriod >= duration) {
1236: // double idealAvgRuns = (duration / minAvgPeriod);
1237: //
1238: // System.out.println("Idealized avg. runs / task : " + (float)idealAvgRuns);
1239: // } else {
1240: // double remainingDuration = (duration - minAvgPeriod);
1241: // double remainingRuns = (remainingDuration / period);
1242: // double idealAvgRuns = (1D + remainingRuns);
1243: //
1244: // System.out.println("Theoretical first cycle time : " + minAvgPeriod);
1245: // System.out.println("Remaining duration : " + remainingDuration);
1246: // System.out.println("Remaining runs : " + remainingRuns);
1247: // System.out.println("Idealized avg. runs per task : " + idealAvgRuns);
1248: // System.out.println("(1 + (requested duration");
1249: // System.out.println(" - theor. first cycle time");
1250: // System.out.println(" ) / requested period)");
1251: // }
1252: //
1253: // testJavaUtilTimer(taskCount, period, duration);
1254: // testHsqlTimer(taskCount, period, duration);
1255: // }
1256: //
1257: //
1258: // /**
1259: // * Runs the java.util.Timer test using the given arguments. <p>
1260: // *
1261: // * @param taskCount the number of WriteAndSync tasks to add
1262: // * @param periodMultiplier the period with with to schedule
1263: // * the tasks, as a multiple of the computed, system-specific
1264: // * average sync time.
1265: // * @param duration The number of milliseconds that the foreground
1266: // * Thread should sleep while the specified number of WriteAndSync
1267: // * tasks are running in the background thread
1268: // */
1269: // public static void testJavaUtilTimer(final int taskCount,
1270: // final long period,
1271: // final long duration) {
1272: //
1273: // System.out.println();
1274: // System.out.println("****************************************");
1275: // System.out.println("* java.util.Timer *");
1276: // System.out.println("****************************************");
1277: // System.out.println();
1278: //
1279: // WriteAndSyncTask.serial = 0;
1280: //
1281: // final java.util.Timer timer = new java.util.Timer();
1282: // final WriteAndSyncTask[] tasks = new WriteAndSyncTask[taskCount];
1283: //
1284: // for (int i = 0; i < taskCount; i++) {
1285: // tasks[i] = new WriteAndSyncTask();
1286: // timer.scheduleAtFixedRate(tasks[i], 0, period);
1287: // }
1288: //
1289: // final long start = now();
1290: //
1291: // try {
1292: // Thread.sleep(duration);
1293: // } catch (Exception e) {
1294: // e.printStackTrace();
1295: // }
1296: //
1297: // for (int i = 0; i < tasks.length; i++) {
1298: // tasks[i].cancel();
1299: // }
1300: //
1301: // timer.cancel();
1302: //
1303: // final long elapsed = now() - start;
1304: //
1305: // System.out.println("Actual test duration: " + elapsed + " ms.");
1306: // System.out.println();
1307: //
1308: // printTaskStats(tasks);
1309: // }
1310: //
1311: // /**
1312: // * Runs the HsqlTimer test using the given arguments. <p>
1313: // *
1314: // * @param taskCount the number of WriteAndSync tasks to add
1315: // * @param periodMultiplier the period with with to schedule
1316: // * the tasks, as a multiple of the computed, system-specific
1317: // * average sync time.
1318: // * @param duration The number of milliseconds that the foreground
1319: // * Thread should sleep while the specified number of WriteAndSync
1320: // * tasks are running in the background thread
1321: // */
1322: // public static void testHsqlTimer(final int taskCount,
1323: // final long period,
1324: // final long duration) {
1325: //
1326: // System.out.println();
1327: // System.out.println("****************************************");
1328: // System.out.println("* org.hsqldb.lib.HsqlTimer *");
1329: // System.out.println("****************************************");
1330: // System.out.println();
1331: //
1332: // WriteAndSyncTask.serial = 0;
1333: //
1334: // final HsqlTimer timer = new HsqlTimer();
1335: // final WriteAndSyncTask[] tasks = new WriteAndSyncTask[taskCount];
1336: // final Object[] ttasks = new Object[taskCount];
1337: //
1338: // for (int i = 0; i < taskCount; i++) {
1339: // tasks[i] = new WriteAndSyncTask();
1340: // ttasks[i] = timer.schedulePeriodicallyAfter(0, period, tasks[i], true);
1341: // }
1342: //
1343: // final long start = now();
1344: //
1345: // try {
1346: // Thread.sleep(duration);
1347: // } catch (Exception e) {
1348: // e.printStackTrace();
1349: // }
1350: //
1351: // final Thread timerThread = timer.getThread();
1352: //
1353: // for (int i = 0; i < taskCount; i++) {
1354: // timer.cancel(ttasks[i]);
1355: // }
1356: //
1357: // try {
1358: // timerThread.join();
1359: // } catch (Exception e) {
1360: // e.printStackTrace();
1361: // }
1362: //
1363: // final long elapsed = now() - start;
1364: //
1365: // System.out.println("Actual test duration: " + elapsed + " ms.");
1366: // System.out.println();
1367: //
1368: // printTaskStats(tasks);
1369: //
1370: // }
1371: //
1372: // static void printTaskStats(WriteAndSyncTask[] tasks) {
1373: // float avgTotal = 0;
1374: // int avgCount = 0;
1375: // int starved = 0;
1376: // int runs = 0;
1377: // Stats periodStats = new Stats();
1378: // Stats runStats = new Stats();
1379: //
1380: // for (int i = 0; i < tasks.length; i++) {
1381: // if (tasks[i].runs > 1) {
1382: // double avgPeriod = tasks[i].getAveragePeriod();
1383: // periodStats.addDataPoint(avgPeriod);
1384: // avgTotal += avgPeriod;
1385: // avgCount++;
1386: // }
1387: // runs += tasks[i].runs;
1388: // if (tasks[i].runs == 0) {
1389: // starved++;
1390: // }
1391: // runStats.addDataPoint(tasks[i].runs);
1392: // tasks[i].release();
1393: // }
1394: //
1395: // float periodAvg = (avgTotal / avgCount);
1396: // float periodMax = (float) periodStats.getMax();
1397: // int periodMaxCnt = 0;
1398: // float periodMin = (float) periodStats.getMin();
1399: // int periodMinCnt = 0;
1400: // float periodRange = (periodMax - periodMin);
1401: // float periodStddev = (float)periodStats.getStdDev();
1402: // float periodGMean = (float)periodStats.getGeometricMean();
1403: // float periodStddevR = (periodRange / periodStddev);
1404: //
1405: // float runsAvg = (runs / (float)tasks.length);
1406: // int runsMin = Math.round((float)runStats.getMin());
1407: // int runsMinCnt = 0;
1408: // int runsMax = Math.round((float)runStats.getMax());
1409: // int runsMaxCnt = 0;
1410: // int runsRange = (runsMax - runsMin);
1411: // float runsStddev = (float) runStats.getStdDev();
1412: // float runsGMean = (float) runStats.getGeometricMean();
1413: // float runsStddevR = (runsRange / runsStddev);
1414: //
1415: // for (int i = 0; i < tasks.length; i++) {
1416: // double avgPeriod = tasks[i].getAveragePeriod();
1417: //
1418: // if (avgPeriod == periodMin) {
1419: // periodMinCnt++;
1420: // }
1421: //
1422: // if (avgPeriod == periodMax) {
1423: // periodMaxCnt++;
1424: // }
1425: //
1426: // if (tasks[i].runs == runsMin) {
1427: // runsMinCnt++;
1428: // }
1429: //
1430: // if (tasks[i].runs == runsMax) {
1431: // runsMaxCnt++;
1432: // }
1433: // }
1434: //
1435: // System.out.println("------------------------");
1436: // System.out.println("Starved tasks (runs = 0): " + starved + " (" + ((100*starved)/tasks.length) + "%)");
1437: // System.out.println("------------------------");
1438: // System.out.println("Period :");
1439: // System.out.println("------------------------");
1440: // System.out.println("Average : " + periodAvg);
1441: // System.out.println("~Minimum (count/runs) : " + periodMin + " (" + periodMinCnt + "/" + tasks.length + ")");
1442: // System.out.println("~Maximum (count/runs) : " + periodMax + " (" + periodMaxCnt + "/" + tasks.length + ")");
1443: // System.out.println("~Range : " + periodRange);
1444: // System.out.println("Geometric mean : " + periodGMean);
1445: // System.out.println("Stddev : " + periodStddev);
1446: // System.out.println("~Range/Stddev : " + periodStddevR);
1447: // System.out.println("------------------------");
1448: // System.out.println("Runs :");
1449: // System.out.println("------------------------");
1450: // System.out.println("Average : " + runsAvg);
1451: // System.out.println("Minimum (count/runs) : " + runsMin + " (" + runsMinCnt + "/" + tasks.length + ")");
1452: // System.out.println("Maximum (count/runs) : " + runsMax + " (" + runsMaxCnt + "/" + tasks.length + ")");
1453: // System.out.println("Range : " + runsRange);
1454: // System.out.println("Geometric mean : " + runsGMean);
1455: // System.out.println("Stddev : " + runsStddev);
1456: // System.out.println("Range/Stddev : " + runsStddevR);
1457: // }
1458: }
|