001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.server.util;
031:
032: import java.util.*;
033: import java.util.logging.*;
034: import java.util.concurrent.*;
035:
036: import com.caucho.loader.*;
037: import com.caucho.util.*;
038: import com.caucho.webbeans.component.WebBeansHandle;
039:
040: /**
041: * A wrapper for Caucho system variables, allowing tests to override
042: * the default variables.
043: */
044: public class ScheduledThreadPool implements ScheduledExecutorService,
045: EnvironmentListener, java.io.Serializable {
046: private static Logger log = Logger
047: .getLogger(ScheduledThreadPool.class.getName());
048: private static L10N L = new L10N(ScheduledThreadPool.class);
049:
050: private static EnvironmentLocal<ScheduledThreadPool> _local = new EnvironmentLocal<ScheduledThreadPool>();
051:
052: private ThreadPool _threadPool;
053:
054: private boolean _isShutdown;
055: private boolean _isTerminated;
056:
057: private ClassLoader _loader;
058:
059: private final HashSet<Future> _futureSet = new HashSet<Future>();
060:
061: private ScheduledThreadPool() {
062: _loader = Thread.currentThread().getContextClassLoader();
063: _threadPool = ThreadPool.getThreadPool();
064:
065: Environment.addEnvironmentListener(this );
066: }
067:
068: public static ScheduledThreadPool getLocal() {
069: synchronized (_local) {
070: ScheduledThreadPool pool = _local.getLevel();
071:
072: if (pool == null) {
073: pool = new ScheduledThreadPool();
074: _local.set(pool);
075: }
076:
077: return pool;
078: }
079: }
080:
081: //
082: // Executor
083: //
084:
085: /**
086: * Launches a thread to execute a command.
087: */
088: public void execute(Runnable command) {
089: if (_isShutdown)
090: throw new IllegalStateException("ThreadPool has closed");
091:
092: TaskFuture future = new TaskFuture(_loader, command, null);
093:
094: synchronized (_futureSet) {
095: _futureSet.add(future);
096:
097: _threadPool.scheduleExecutorTask(future);
098: }
099: }
100:
101: //
102: // ExecutorService
103: //
104:
105: /**
106: * Blocks until the tasks complete.
107: */
108: public boolean awaitTermination(long timeout, TimeUnit unit) {
109: throw new UnsupportedOperationException();
110: }
111:
112: /**
113: * Invokes a set of tasks.
114: */
115: public List invokeAll(Collection tasks) {
116: throw new UnsupportedOperationException();
117: }
118:
119: /**
120: * Invokes a set of tasks.
121: */
122: public List invokeAll(Collection tasks, long timeout, TimeUnit unit) {
123: // XXX: todo
124: throw new UnsupportedOperationException();
125: }
126:
127: /**
128: * Invokes a set of tasks.
129: */
130: public Object invokeAny(Collection tasks) {
131: // XXX: todo
132: throw new UnsupportedOperationException();
133: }
134:
135: /**
136: * Invokes a set of tasks.
137: */
138: public Object invokeAny(Collection tasks, long timeout,
139: TimeUnit unit) {
140: // XXX: todo
141: throw new UnsupportedOperationException();
142: }
143:
144: /**
145: * Return true if the executor is shut down.
146: */
147: public boolean isShutdown() {
148: return _isShutdown;
149: }
150:
151: /**
152: * Return true if the executor has completed shutting down.
153: */
154: public boolean isTerminated() {
155: return _isTerminated;
156: }
157:
158: /**
159: * Starts the shutdown.
160: */
161: public void shutdown() {
162: throw new UnsupportedOperationException();
163: }
164:
165: /**
166: * Starts the shutdown.
167: */
168: public List<Runnable> shutdownNow() {
169: throw new UnsupportedOperationException();
170: }
171:
172: /**
173: * Submits a task for execution.
174: */
175: public <T> Future<T> submit(Callable<T> task) {
176: if (_isShutdown)
177: throw new IllegalStateException("ThreadPool has closed");
178:
179: TaskFuture<T> future = new TaskFuture<T>(_loader, task);
180:
181: synchronized (_futureSet) {
182: _futureSet.add(future);
183:
184: _threadPool.scheduleExecutorTask(future);
185: }
186:
187: return future;
188: }
189:
190: /**
191: * Submits a task for execution.
192: */
193: public Future<?> submit(Runnable command) {
194: if (_isShutdown)
195: throw new IllegalStateException(L
196: .l("Can't submit after ThreadPool has closed"));
197:
198: TaskFuture future = new TaskFuture(_loader, command, null);
199:
200: synchronized (_futureSet) {
201: _futureSet.add(future);
202:
203: _threadPool.scheduleExecutorTask(future);
204: }
205:
206: return future;
207: }
208:
209: /**
210: * Submits a task for execution.
211: */
212: public <T> Future<T> submit(Runnable task, T result) {
213: if (_isShutdown)
214: throw new IllegalStateException(L
215: .l("Can't submit after ThreadPool has closed"));
216:
217: TaskFuture<T> future = new TaskFuture<T>(_loader, task, result);
218:
219: synchronized (_futureSet) {
220: _futureSet.add(future);
221:
222: _threadPool.scheduleExecutorTask(future);
223: }
224:
225: return future;
226: }
227:
228: //
229: // ScheduledExecutorService
230: //
231:
232: /**
233: * Schedules a future task.
234: */
235: public <V> ScheduledFuture<V> schedule(Callable<V> callable,
236: long delay, TimeUnit unit) {
237: if (_isShutdown)
238: throw new IllegalStateException(L
239: .l("Can't submit after ThreadPool has closed"));
240:
241: long initialExpires = Alarm.getCurrentTime()
242: + unit.toMillis(delay);
243:
244: AlarmFuture future = new AlarmFuture(_loader, callable,
245: initialExpires, 0, 0);
246:
247: synchronized (_futureSet) {
248: _futureSet.add(future);
249: }
250:
251: future.queue();
252:
253: return future;
254: }
255:
256: /**
257: * Schedules a future task.
258: */
259: public ScheduledFuture<?> schedule(Runnable command, long delay,
260: TimeUnit unit) {
261: if (_isShutdown)
262: throw new IllegalStateException(L
263: .l("Can't submit after ThreadPool has closed"));
264:
265: long initialExpires = Alarm.getCurrentTime()
266: + unit.toMillis(delay);
267:
268: AlarmFuture future = new AlarmFuture(_loader, command,
269: initialExpires, 0, 0);
270:
271: synchronized (_futureSet) {
272: _futureSet.add(future);
273: }
274:
275: future.queue();
276:
277: return future;
278: }
279:
280: /**
281: * Schedules a future task.
282: */
283: public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
284: long initialDelay, long period, TimeUnit unit) {
285: if (_isShutdown)
286: throw new IllegalStateException(L
287: .l("Can't submit after ThreadPool has closed"));
288:
289: long initialExpires = Alarm.getCurrentTime()
290: + unit.toMillis(initialDelay);
291:
292: AlarmFuture future = new AlarmFuture(_loader, command,
293: initialExpires, unit.toMillis(period), 0);
294:
295: synchronized (_futureSet) {
296: _futureSet.add(future);
297: }
298:
299: future.queue();
300:
301: return future;
302: }
303:
304: /**
305: * Schedules with fixed delay
306: */
307: public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
308: long initialDelay, long delay, TimeUnit unit) {
309: if (_isShutdown)
310: throw new IllegalStateException(L
311: .l("Can't submit after ThreadPool has closed"));
312:
313: long initialExpires = Alarm.getCurrentTime()
314: + unit.toMillis(initialDelay);
315:
316: AlarmFuture future = new AlarmFuture(_loader, command,
317: initialExpires, 0, unit.toMillis(delay));
318:
319: synchronized (_futureSet) {
320: _futureSet.add(future);
321: }
322:
323: future.queue();
324:
325: return future;
326: }
327:
328: //
329: // Timer
330: //
331:
332: /**
333: * Returns the Timer for this pool.
334: */
335: public Timer getTimer() {
336: throw new UnsupportedOperationException();
337: }
338:
339: //
340: // lifecycle
341: //
342:
343: /**
344: * Stops the pool on environment shutdown.
345: */
346: private void stop() {
347: _isShutdown = true;
348:
349: while (true) {
350: Future future = null;
351:
352: synchronized (_futureSet) {
353: Iterator<Future> iter = _futureSet.iterator();
354:
355: if (iter.hasNext()) {
356: future = iter.next();
357:
358: _futureSet.remove(future);
359: }
360: }
361:
362: if (future == null)
363: break;
364:
365: future.cancel(true);
366: }
367: }
368:
369: void removeFuture(Future future) {
370: synchronized (_futureSet) {
371: _futureSet.remove(future);
372: }
373: }
374:
375: //
376: // Environment callbacks.
377: //
378:
379: /**
380: * Called when the environment config phase
381: */
382: public void environmentConfig(EnvironmentClassLoader loader) {
383: }
384:
385: /**
386: * Called when the environment starts.
387: */
388: public void environmentStart(EnvironmentClassLoader loader) {
389: }
390:
391: /**
392: * Called when the environment stops.
393: */
394: public void environmentStop(EnvironmentClassLoader loader) {
395: stop();
396: }
397:
398: /**
399: * Serialize to a webbeans handle
400: */
401: public Object writeReplace() {
402: return new WebBeansHandle(ScheduledExecutorService.class);
403: }
404:
405: @Override
406: public String toString() {
407: if (_loader instanceof EnvironmentClassLoader)
408: return "ScheduledThreadPool["
409: + ((EnvironmentClassLoader) _loader).getId() + "]";
410: else
411: return "ScheduledThreadPool[" + _loader + "]";
412: }
413:
414: class TaskFuture<T> implements Future<T>, Runnable {
415: private final ClassLoader _loader;
416: private final Callable<T> _callable;
417: private final Runnable _runnable;
418:
419: private Thread _thread;
420:
421: private boolean _isCancelled;
422: private boolean _isDone;
423:
424: private Exception _exception;
425: private T _value;
426:
427: TaskFuture(ClassLoader loader, Callable<T> callable) {
428: _loader = loader;
429: _callable = callable;
430: _runnable = null;
431: }
432:
433: TaskFuture(ClassLoader loader, Runnable runnable, T result) {
434: _loader = loader;
435: _callable = null;
436: _runnable = runnable;
437: _value = result;
438: }
439:
440: public boolean isCancelled() {
441: return _isCancelled;
442: }
443:
444: public boolean isDone() {
445: return _isDone;
446: }
447:
448: public boolean cancel(boolean mayInterrupt) {
449: synchronized (this ) {
450: removeFuture(this );
451:
452: if (_isCancelled || _isDone)
453: return false;
454:
455: _isCancelled = true;
456:
457: notifyAll();
458: }
459:
460: Thread thread = _thread;
461:
462: if (mayInterrupt && thread != null)
463: thread.interrupt();
464:
465: return true;
466: }
467:
468: public T get() throws InterruptedException, ExecutionException {
469: try {
470: return get(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS);
471: } catch (TimeoutException e) {
472: throw new IllegalStateException(e);
473: }
474: }
475:
476: public T get(long timeout, TimeUnit unit)
477: throws InterruptedException, ExecutionException,
478: TimeoutException {
479: long expire = Alarm.getCurrentTime()
480: + unit.toMillis(timeout);
481:
482: synchronized (this ) {
483: while (!_isDone && !_isCancelled
484: && Alarm.getCurrentTime() < expire
485: && !Thread.currentThread().isInterrupted()) {
486: if (!Alarm.isTest())
487: wait(expire - Alarm.getCurrentTime());
488: else {
489: wait(1000);
490: break;
491: }
492: }
493: }
494:
495: if (_exception != null)
496: throw new ExecutionException(_exception);
497: else if (_isDone)
498: return _value;
499: else if (_isCancelled)
500: throw new CancellationException();
501: else
502: throw new TimeoutException();
503: }
504:
505: public void run() {
506: _thread = Thread.currentThread();
507: ClassLoader oldLoader = _thread.getContextClassLoader();
508:
509: try {
510: if (_isCancelled || _isDone || _isShutdown)
511: return;
512:
513: _thread.setContextClassLoader(_loader);
514:
515: if (_callable != null)
516: _value = _callable.call();
517: else
518: _runnable.run();
519: } catch (RuntimeException e) {
520: throw e;
521: } catch (Exception e) {
522: _exception = e;
523: } finally {
524: _thread.setContextClassLoader(oldLoader);
525: _thread = null;
526: _isDone = true;
527:
528: _threadPool.completeExecutorTask();
529:
530: // alarm
531:
532: removeFuture(this );
533:
534: synchronized (this ) {
535: notifyAll();
536: }
537: }
538: }
539:
540: public String toString() {
541: Object task = _callable != null ? _callable : _runnable;
542:
543: if (_isDone)
544: return "TaskFuture[" + task + ",done]";
545: else if (_thread != null) {
546: if (Alarm.isTest())
547: return "TaskFuture[" + task + ",active]";
548: else
549: return "TaskFuture[" + task + "," + _thread + "]";
550: } else if (_isCancelled)
551: return "TaskFuture[" + task + ",cancelled]";
552: else
553: return "TaskFuture[" + task + ",pending]";
554: }
555: }
556:
557: class AlarmFuture<T> implements ScheduledFuture<T>, AlarmListener {
558: private final String _name;
559:
560: private final ClassLoader _loader;
561: private final Callable<T> _callable;
562: private final Runnable _runnable;
563:
564: private final Alarm _alarm;
565:
566: private final long _initialExpires;
567: private final long _period;
568: private final long _delay;
569:
570: private long _nextTime;
571:
572: private Thread _thread;
573:
574: private boolean _isCancelled;
575: private boolean _isDone;
576: private int _alarmCount;
577:
578: private Exception _exception;
579: private T _value;
580:
581: AlarmFuture(ClassLoader loader, Callable<T> callable,
582: long initialExpires, long period, long delay) {
583: _name = "Scheduled[" + callable + "]";
584:
585: _loader = loader;
586: _callable = callable;
587: _runnable = null;
588:
589: _initialExpires = initialExpires;
590: _period = period;
591: _delay = delay;
592: _nextTime = initialExpires;
593:
594: _alarm = new Alarm(_name, this , loader);
595: }
596:
597: AlarmFuture(ClassLoader loader, Runnable runnable,
598: long initialExpires, long period, long delay) {
599: _name = "Scheduled[" + runnable + "]";
600:
601: _loader = loader;
602: _callable = null;
603: _runnable = runnable;
604:
605: _initialExpires = initialExpires;
606: _period = period;
607: _delay = delay;
608:
609: _alarm = new Alarm(_name, this , loader);
610: }
611:
612: void queue() {
613: _alarm.queue(_initialExpires - Alarm.getCurrentTime());
614: }
615:
616: public boolean isCancelled() {
617: return _isCancelled;
618: }
619:
620: public boolean isDone() {
621: return _isDone;
622: }
623:
624: public long getDelay(TimeUnit unit) {
625: long delay = _nextTime - Alarm.getCurrentTime();
626:
627: return TimeUnit.MILLISECONDS.convert(delay, unit);
628: }
629:
630: public int compareTo(Delayed b) {
631: long delta = (getDelay(TimeUnit.MILLISECONDS) - b
632: .getDelay(TimeUnit.MILLISECONDS));
633:
634: if (delta < 0)
635: return -1;
636: else if (delta > 0)
637: return 1;
638: else
639: return 0;
640: }
641:
642: public boolean cancel(boolean mayInterrupt) {
643: synchronized (this ) {
644: if (_isCancelled || _isDone)
645: return false;
646:
647: _isCancelled = true;
648:
649: _alarm.dequeue();
650:
651: notifyAll();
652: }
653:
654: removeFuture(this );
655:
656: Thread thread = _thread;
657:
658: if (mayInterrupt && thread != null)
659: thread.interrupt();
660:
661: return true;
662: }
663:
664: public T get() throws InterruptedException, ExecutionException {
665: try {
666: return get(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS);
667: } catch (TimeoutException e) {
668: throw new IllegalStateException(e);
669: }
670: }
671:
672: public T get(long timeout, TimeUnit unit)
673: throws InterruptedException, ExecutionException,
674: TimeoutException {
675: long expire = Alarm.getCurrentTime()
676: + unit.toMillis(timeout);
677: int count = _alarmCount;
678:
679: while (!_isDone && !_isCancelled && count == _alarmCount
680: && Alarm.getCurrentTime() < expire
681: && !Thread.currentThread().isInterrupted()) {
682: synchronized (this ) {
683: wait(expire - Alarm.getCurrentTime());
684: }
685: }
686:
687: if (_exception != null)
688: throw new ExecutionException(_exception);
689: else if (_isDone || count != _alarmCount)
690: return _value;
691: else if (_isCancelled)
692: throw new CancellationException();
693: else
694: throw new TimeoutException();
695: }
696:
697: public void handleAlarm(Alarm alarm) {
698: if (_isCancelled || _isDone || _isShutdown)
699: return;
700:
701: _thread = Thread.currentThread();
702: ClassLoader oldLoader = _thread.getContextClassLoader();
703: String oldName = _thread.getName();
704:
705: try {
706: _thread.setContextClassLoader(_loader);
707: _thread.setName(_name);
708:
709: if (_callable != null)
710: _value = _callable.call();
711: else
712: _runnable.run();
713: } catch (Exception e) {
714: log.log(Level.FINE, e.toString(), e);
715:
716: _exception = e;
717: _isCancelled = true;
718: } finally {
719: _thread.setContextClassLoader(oldLoader);
720: _thread.setName(oldName);
721: _thread = null;
722:
723: synchronized (this ) {
724: _alarmCount++;
725:
726: if (_isCancelled || _isDone) {
727: removeFuture(this );
728: } else if (_delay > 0) {
729: _nextTime = Alarm.getCurrentTime() + _delay;
730:
731: _alarm.queue(_delay);
732: } else if (_period > 0) {
733: long now = Alarm.getCurrentTime();
734: long time = now - _initialExpires;
735: long modTime = time % _period;
736:
737: if (modTime > 0) {
738: long delta = (_period - modTime);
739:
740: _nextTime = now + delta;
741:
742: _alarm.queue(delta);
743: } else {
744: _nextTime = now + _period;
745:
746: _alarm.queue(_period);
747: }
748: } else {
749: _isDone = true;
750: removeFuture(this );
751: }
752:
753: notifyAll();
754: }
755: }
756: }
757:
758: public String toString() {
759: Object task = _callable != null ? _callable : _runnable;
760:
761: if (_isDone)
762: return "AlarmFuture[" + task + ",done]";
763: else if (_thread != null) {
764: if (Alarm.isTest())
765: return "AlarmFuture[" + task + ",active]";
766: else
767: return "AlarmFuture[" + task + "," + _thread + "]";
768: } else if (_isCancelled)
769: return "AlarmFuture[" + task + ",cancelled]";
770: else
771: return "AlarmFuture[" + task + ",pending]";
772: }
773: }
774: }
|