001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.util;
031:
032: import java.lang.reflect.Method;
033: import java.util.logging.Level;
034: import java.util.logging.Logger;
035:
036: /**
037: * The alarm class provides a lightweight event scheduler. This allows
038: * an objects to schedule a timeout without creating a new thread.
039: *
040: * <p>A separate thread periodically tests the queue for alarms ready.
041: */
042: public class Alarm implements ThreadTask {
043: static private final Logger log = Logger.getLogger(Alarm.class
044: .getName());
045:
046: static private final Integer timeLock = new Integer(0);
047:
048: static private volatile long _currentTime = System
049: .currentTimeMillis();
050:
051: static private int _concurrentAlarmThrottle = 5;
052:
053: static private Object _queueLock = new Object();
054:
055: static private AlarmThread _alarmThread;
056:
057: static private Alarm[] _heap = new Alarm[256];
058: static private int _heapTop;
059:
060: static private volatile int _runningAlarmCount;
061:
062: static private long _testTime;
063: static private int _testCount;
064:
065: static private final Method _nanoTimeMethod;
066:
067: private long _wakeTime;
068: private AlarmListener _listener;
069: private ClassLoader _contextLoader;
070: private String _name;
071:
072: private int _heapIndex = 0;
073:
074: private volatile boolean _isRunning;
075:
076: static {
077: _currentTime = System.currentTimeMillis();
078:
079: Method nanoTimeMethod;
080:
081: try {
082: nanoTimeMethod = System.class.getMethod("nanoTime", null);
083: } catch (NoSuchMethodException e) {
084: nanoTimeMethod = null;
085: }
086:
087: _nanoTimeMethod = nanoTimeMethod;
088: }
089:
090: /**
091: * Create a new wakeup alarm with a designated listener as a callback.
092: * The alarm is not scheduled.
093: */
094: protected Alarm() {
095: _name = "alarm";
096:
097: init();
098: }
099:
100: /**
101: * Create a new wakeup alarm with a designated listener as a callback.
102: * The alarm is not scheduled.
103: */
104: public Alarm(AlarmListener listener) {
105: this ("alarm[" + listener + "]", listener);
106: }
107:
108: /**
109: * Create a new wakeup alarm with a designated listener as a callback.
110: * The alarm is not scheduled.
111: */
112: public Alarm(String name, AlarmListener listener) {
113: this (name, listener, Thread.currentThread()
114: .getContextClassLoader());
115: }
116:
117: /**
118: * Create a new wakeup alarm with a designated listener as a callback.
119: * The alarm is not scheduled.
120: */
121: public Alarm(String name, AlarmListener listener, ClassLoader loader) {
122: _name = name;
123:
124: setListener(listener);
125: setContextLoader(loader);
126:
127: init();
128: }
129:
130: /**
131: * Create a new wakeup alarm with a designated listener as a callback.
132: * The alarm is not scheduled.
133: */
134: public Alarm(String name, AlarmListener listener, long delta,
135: ClassLoader loader) {
136: _name = name;
137:
138: setListener(listener);
139: setContextLoader(loader);
140:
141: queue(delta);
142:
143: init();
144: }
145:
146: /**
147: * Creates a named alarm and schedules its wakeup.
148: *
149: * @param name the object prepared to receive the callback
150: * @param listener the object prepared to receive the callback
151: * @param delta the time in milliseconds to wake up
152: */
153: public Alarm(String name, AlarmListener listener, long delta) {
154: this (listener);
155:
156: _name = name;
157: queue(delta);
158:
159: init();
160: }
161:
162: /**
163: * Creates a new alarm and schedules its wakeup.
164: *
165: * @param listener the object prepared to receive the callback
166: * @param delta the time in milliseconds to wake up
167: */
168: public Alarm(AlarmListener listener, long delta) {
169: this (listener);
170:
171: queue(delta);
172:
173: init();
174: }
175:
176: private void init() {
177: synchronized (Alarm.class) {
178: if (_alarmThread == null) {
179: _currentTime = System.currentTimeMillis();
180: _alarmThread = new AlarmThread();
181: _alarmThread.start();
182: }
183: }
184: }
185:
186: /**
187: * Returns the alarm name.
188: */
189: public String getName() {
190: return _name;
191: }
192:
193: /**
194: * Sets the alarm name.
195: */
196: protected void setName(String name) {
197: _name = name;
198: }
199:
200: /**
201: * Returns the approximate current time in milliseconds.
202: * Convenient for minimizing system calls.
203: */
204: public static long getCurrentTime() {
205: if (_alarmThread != null)
206: return _currentTime;
207: else
208: return System.currentTimeMillis();
209: }
210:
211: /**
212: * Returns the exact current time in milliseconds.
213: */
214: public static long getExactTime() {
215: if (_testTime > 0)
216: return _testTime;
217: else
218: return System.currentTimeMillis();
219: }
220:
221: /**
222: * Returns the exact current time in nanoseconds.
223: */
224: public static long getExactTimeNanoseconds() {
225: if (_testTime > 0)
226: return _testTime * 1000000L;
227:
228: return System.currentTimeMillis() * 1000000L;
229: }
230:
231: /**
232: * Returns true for testing.
233: */
234: public static boolean isTest() {
235: return _testTime > 0;
236: }
237:
238: /**
239: * Returns the wake time of this alarm.
240: */
241: public long getWakeTime() {
242: return _wakeTime;
243: }
244:
245: /**
246: * Return the alarm's listener.
247: */
248: public AlarmListener getListener() {
249: return _listener;
250: }
251:
252: /**
253: * Sets the alarm's listener.
254: */
255: public void setListener(AlarmListener listener) {
256: _listener = listener;
257: }
258:
259: /**
260: * Sets the alarm's context loader
261: */
262: public void setContextLoader(ClassLoader loader) {
263: _contextLoader = loader;
264: }
265:
266: /**
267: * Sets the alarm's context loader
268: */
269: public ClassLoader getContextLoader() {
270: return _contextLoader;
271: }
272:
273: /**
274: * Returns true if the alarm is currently queued.
275: */
276: public boolean isQueued() {
277: return _heapIndex != 0;
278: }
279:
280: /**
281: * Returns true if the alarm is currently running
282: */
283: boolean isRunning() {
284: return _isRunning;
285: }
286:
287: /**
288: * Queue the alarm for wakeup.
289: *
290: * @param delta time in milliseconds to wake
291: */
292: public void queue(long delta) {
293: synchronized (_queueLock) {
294: if (_heapIndex > 0)
295: dequeueImpl(this );
296:
297: long wakeTime = delta + getCurrentTime();
298: _wakeTime = wakeTime;
299:
300: insertImpl(this );
301: }
302: }
303:
304: /**
305: * Remove the alarm from the wakeup queue.
306: */
307: public void dequeue() {
308: synchronized (_queueLock) {
309: if (_heapIndex > 0)
310: dequeueImpl(this );
311: }
312: }
313:
314: /**
315: * Runs the alarm. This is only called from the worker thread.
316: */
317: public void run() {
318: try {
319: handleAlarm();
320: } catch (Throwable e) {
321: log.log(Level.WARNING, e.toString(), e);
322: } finally {
323: synchronized (_queueLock) {
324: _isRunning = false;
325: _runningAlarmCount--;
326: }
327: }
328: }
329:
330: /**
331: * Handles the alarm.
332: */
333: private void handleAlarm() {
334: AlarmListener listener = getListener();
335:
336: if (listener == null)
337: return;
338:
339: Thread thread = Thread.currentThread();
340: ClassLoader loader = getContextLoader();
341:
342: if (loader != null)
343: thread.setContextClassLoader(loader);
344: else
345: thread.setContextClassLoader(ClassLoader
346: .getSystemClassLoader());
347:
348: try {
349: listener.handleAlarm(this );
350: } finally {
351: thread.setContextClassLoader(ClassLoader
352: .getSystemClassLoader());
353: }
354: }
355:
356: /**
357: * Closes the alarm instance
358: */
359: public void close() {
360: dequeue();
361: _listener = null;
362: _contextLoader = null;
363: }
364:
365: /**
366: * Returns the next alarm ready to run
367: */
368: static Alarm extractAlarm() {
369: long now = getCurrentTime();
370:
371: synchronized (_queueLock) {
372: Alarm[] heap = _heap;
373:
374: Alarm alarm = heap[1];
375:
376: if (alarm == null)
377: return null;
378: else if (now < alarm._wakeTime)
379: return null;
380:
381: dequeueImpl(alarm);
382:
383: return alarm;
384: }
385: }
386:
387: /**
388: * Removes the alarm item. Must be called from within the heap lock.
389: */
390: private static void insertImpl(Alarm item) {
391: if (item._heapIndex != 0)
392: throw new IllegalStateException();
393:
394: // resize if necessary
395: if (_heap.length <= _heapTop + 2) {
396: Alarm[] newHeap = new Alarm[2 * _heap.length];
397: System.arraycopy(_heap, 0, newHeap, 0, _heap.length);
398: _heap = newHeap;
399: }
400:
401: Alarm[] heap = _heap;
402:
403: int i = ++_heapTop;
404: int parent;
405: Alarm alarm;
406: long wakeTime = item._wakeTime;
407:
408: while (i > 1
409: && wakeTime < (alarm = heap[parent = (i >> 1)])._wakeTime) {
410: heap[i] = alarm;
411: alarm._heapIndex = i;
412: i = parent;
413: }
414:
415: heap[i] = item;
416: item._heapIndex = i;
417:
418: if (_heapTop < i)
419: throw new IllegalStateException();
420: }
421:
422: /**
423: * Removes the alarm item. Must be called from within the heap lock.
424: */
425: private static void dequeueImpl(Alarm item) {
426: int i = item._heapIndex;
427:
428: if (i < 1)
429: return;
430:
431: if (_heapTop < i)
432: throw new IllegalStateException("bad heap: " + _heapTop
433: + " index:" + i);
434:
435: Alarm[] heap = _heap;
436:
437: if (_heapTop < 1)
438: throw new IllegalStateException();
439:
440: int size = _heapTop--;
441:
442: heap[i] = heap[size];
443: heap[i]._heapIndex = i;
444: heap[size] = null;
445:
446: item._heapIndex = 0;
447:
448: if (size == i)
449: return;
450:
451: if (item._wakeTime < heap[i]._wakeTime) {
452: while (i < size) {
453: item = heap[i];
454:
455: int minIndex = i;
456: long minWakeTime = item._wakeTime;
457:
458: int left = i << 1;
459: if (left < size && heap[left]._wakeTime < minWakeTime) {
460: minIndex = left;
461: minWakeTime = heap[left]._wakeTime;
462: }
463:
464: int right = left + 1;
465: if (right < size && heap[right]._wakeTime < minWakeTime)
466: minIndex = right;
467:
468: if (i == minIndex)
469: return;
470:
471: heap[i] = heap[minIndex];
472: heap[i]._heapIndex = i;
473: heap[minIndex] = item;
474: item._heapIndex = minIndex;
475:
476: i = minIndex;
477: }
478: } else {
479: int parent;
480: Alarm alarm;
481: item = heap[i];
482: long wakeTime = item._wakeTime;
483:
484: while (i > 1
485: && wakeTime < (alarm = heap[parent = (i >> 1)])._wakeTime) {
486: heap[i] = alarm;
487: alarm._heapIndex = i;
488: i = parent;
489: }
490:
491: heap[i] = item;
492: item._heapIndex = i;
493: }
494: }
495:
496: // test
497:
498: static void testClear() {
499: for (; _heapTop > 0; _heapTop--) {
500: Alarm alarm = _heap[_heapTop];
501: alarm._heapIndex = 0;
502: _heap[_heapTop] = null;
503: }
504: }
505:
506: static void setTestTime(long time) {
507: _testTime = time;
508:
509: if (_testTime > 0) {
510: if (time < _currentTime) {
511: testClear();
512: }
513:
514: _currentTime = time;
515: } else
516: _currentTime = System.currentTimeMillis();
517:
518: Alarm alarm;
519:
520: Thread thread = Thread.currentThread();
521: ClassLoader oldLoader = thread.getContextClassLoader();
522:
523: try {
524: while ((alarm = Alarm.extractAlarm()) != null) {
525: alarm.run();
526: }
527: } finally {
528: thread.setContextClassLoader(oldLoader);
529: }
530:
531: try {
532: Thread.currentThread().sleep(10);
533: } catch (Exception e) {
534: }
535: }
536:
537: public String toString() {
538: return "Alarm[" + _name + "]";
539: }
540:
541: static class AlarmThread extends Thread {
542: private CoordinatorTask _coordinator = new CoordinatorTask();
543:
544: public void run() {
545: while (true) {
546: try {
547: if (_testTime > 0)
548: _currentTime = _testTime;
549: else
550: _currentTime = System.currentTimeMillis();
551:
552: _coordinator.schedule();
553:
554: Thread.sleep(500);
555: } catch (Throwable e) {
556: }
557: }
558: }
559:
560: AlarmThread() {
561: super ("resin-alarm");
562: setDaemon(true);
563: }
564: }
565:
566: private static class CoordinatorTask implements ThreadTask {
567: private boolean _isRunning;
568:
569: /**
570: * schedules the task.
571: */
572: void schedule() {
573: boolean isRunning;
574:
575: synchronized (this ) {
576: isRunning = _isRunning;
577: _isRunning = true;
578: }
579:
580: if (!isRunning)
581: ThreadPool.getThreadPool().schedulePriority(this );
582: }
583:
584: /**
585: * Runs the coordinator task.
586: */
587: public void run() {
588: try {
589: Thread thread = Thread.currentThread();
590: // String oldName = thread.getName();
591:
592: // thread.setName("alarm-coordinator");
593:
594: Alarm alarm;
595:
596: while ((alarm = Alarm.extractAlarm()) != null) {
597: // throttle alarm invocations by 5ms so quick alarms don't need
598: // extra threads
599: if (_concurrentAlarmThrottle < _runningAlarmCount) {
600: try {
601: Thread.sleep(5);
602: } catch (Throwable e) {
603: }
604: }
605:
606: ThreadPool.getThreadPool().startPriority(alarm);
607: }
608:
609: // thread.setName(oldName);
610: } finally {
611: _isRunning = false;
612: }
613: }
614: }
615: }
|