001: /*
002: * Distributed as part of c3p0 v.0.9.1.2
003: *
004: * Copyright (C) 2005 Machinery For Change, Inc.
005: *
006: * Author: Steve Waldman <swaldman@mchange.com>
007: *
008: * This library is free software; you can redistribute it and/or modify
009: * it under the terms of the GNU Lesser General Public License version 2.1, as
010: * published by the Free Software Foundation.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
015: * GNU Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public License
018: * along with this software; see the file LICENSE. If not, write to the
019: * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
020: * Boston, MA 02111-1307, USA.
021: */
022:
023: package com.mchange.v2.async;
024:
025: import java.util.*;
026: import com.mchange.v2.log.*;
027:
028: import java.io.StringWriter;
029: import java.io.PrintWriter;
030: import java.io.IOException;
031: import java.lang.reflect.Method;
032: import com.mchange.v2.io.IndentedWriter;
033: import com.mchange.v2.util.ResourceClosedException;
034:
035: public final class ThreadPoolAsynchronousRunner implements
036: AsynchronousRunner {
037: final static MLogger logger = MLog
038: .getLogger(ThreadPoolAsynchronousRunner.class);
039:
040: final static int POLL_FOR_STOP_INTERVAL = 5000; //milliseconds
041:
042: final static int DFLT_DEADLOCK_DETECTOR_INTERVAL = 10000; //milliseconds
043: final static int DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK = 60000; //milliseconds
044: final static int DFLT_MAX_INDIVIDUAL_TASK_TIME = 0; //milliseconds, <= 0 means don't enforce a max task time
045:
046: final static int DFLT_MAX_EMERGENCY_THREADS = 10;
047:
048: int deadlock_detector_interval;
049: int interrupt_delay_after_apparent_deadlock;
050: int max_individual_task_time;
051:
052: int num_threads;
053: boolean daemon;
054: HashSet managed;
055: HashSet available;
056: LinkedList pendingTasks;
057:
058: Timer myTimer;
059: boolean should_cancel_timer;
060:
061: TimerTask deadlockDetector = new DeadlockDetector();
062: TimerTask replacedThreadInterruptor = null;
063:
064: Map stoppedThreadsToStopDates = new HashMap();
065:
066: private ThreadPoolAsynchronousRunner(int num_threads,
067: boolean daemon, int max_individual_task_time,
068: int deadlock_detector_interval,
069: int interrupt_delay_after_apparent_deadlock, Timer myTimer,
070: boolean should_cancel_timer) {
071: this .num_threads = num_threads;
072: this .daemon = daemon;
073: this .max_individual_task_time = max_individual_task_time;
074: this .deadlock_detector_interval = deadlock_detector_interval;
075: this .interrupt_delay_after_apparent_deadlock = interrupt_delay_after_apparent_deadlock;
076: this .myTimer = myTimer;
077: this .should_cancel_timer = should_cancel_timer;
078:
079: recreateThreadsAndTasks();
080:
081: myTimer.schedule(deadlockDetector, deadlock_detector_interval,
082: deadlock_detector_interval);
083:
084: }
085:
086: public ThreadPoolAsynchronousRunner(int num_threads,
087: boolean daemon, int max_individual_task_time,
088: int deadlock_detector_interval,
089: int interrupt_delay_after_apparent_deadlock, Timer myTimer) {
090: this (num_threads, daemon, max_individual_task_time,
091: deadlock_detector_interval,
092: interrupt_delay_after_apparent_deadlock, myTimer, false);
093: }
094:
095: public ThreadPoolAsynchronousRunner(int num_threads,
096: boolean daemon, int max_individual_task_time,
097: int deadlock_detector_interval,
098: int interrupt_delay_after_apparent_deadlock) {
099: this (num_threads, daemon, max_individual_task_time,
100: deadlock_detector_interval,
101: interrupt_delay_after_apparent_deadlock,
102: new Timer(true), true);
103: }
104:
105: public ThreadPoolAsynchronousRunner(int num_threads,
106: boolean daemon, Timer sharedTimer) {
107: this (num_threads, daemon, DFLT_MAX_INDIVIDUAL_TASK_TIME,
108: DFLT_DEADLOCK_DETECTOR_INTERVAL,
109: DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK,
110: sharedTimer, false);
111: }
112:
113: public ThreadPoolAsynchronousRunner(int num_threads, boolean daemon) {
114: this (num_threads, daemon, DFLT_MAX_INDIVIDUAL_TASK_TIME,
115: DFLT_DEADLOCK_DETECTOR_INTERVAL,
116: DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK,
117: new Timer(true), true);
118: }
119:
120: public synchronized void postRunnable(Runnable r) {
121: try {
122: pendingTasks.add(r);
123: this .notifyAll();
124: } catch (NullPointerException e) {
125: //e.printStackTrace();
126: if (Debug.DEBUG) {
127: if (logger.isLoggable(MLevel.FINE))
128: logger
129: .log(
130: MLevel.FINE,
131: "NullPointerException while posting Runnable -- Probably we're closed.",
132: e);
133: }
134: throw new ResourceClosedException(
135: "Attempted to use a ThreadPoolAsynchronousRunner in a closed or broken state.");
136: }
137: }
138:
139: public synchronized int getThreadCount() {
140: return managed.size();
141: }
142:
143: public void close(boolean skip_remaining_tasks) {
144: synchronized (this ) {
145: if (managed == null)
146: return;
147: deadlockDetector.cancel();
148: //replacedThreadInterruptor.cancel();
149: if (should_cancel_timer)
150: myTimer.cancel();
151: myTimer = null;
152: for (Iterator ii = managed.iterator(); ii.hasNext();) {
153: PoolThread stopMe = (PoolThread) ii.next();
154: stopMe.gentleStop();
155: if (skip_remaining_tasks)
156: stopMe.interrupt();
157: }
158: managed = null;
159:
160: if (!skip_remaining_tasks) {
161: for (Iterator ii = pendingTasks.iterator(); ii
162: .hasNext();) {
163: Runnable r = (Runnable) ii.next();
164: new Thread(r).start();
165: ii.remove();
166: }
167: }
168: available = null;
169: pendingTasks = null;
170: }
171: }
172:
173: public void close() {
174: close(true);
175: }
176:
177: public synchronized int getActiveCount() {
178: return managed.size() - available.size();
179: }
180:
181: public synchronized int getIdleCount() {
182: return available.size();
183: }
184:
185: public synchronized int getPendingTaskCount() {
186: return pendingTasks.size();
187: }
188:
189: public synchronized String getStatus() {
190: /*
191: StringBuffer sb = new StringBuffer( 512 );
192: sb.append( this.toString() );
193: sb.append( ' ' );
194: appendStatusString( sb );
195: return sb.toString();
196: */
197:
198: return getMultiLineStatusString();
199: }
200:
201: // done reflectively for jdk 1.3/1.4 compatability
202: public synchronized String getStackTraces() {
203: return getStackTraces(0);
204: }
205:
206: // protected by ThreadPoolAsynchronousRunner.this' lock
207: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
208: private String getStackTraces(int initial_indent) {
209: if (managed == null)
210: return null;
211:
212: try {
213: Method m = Thread.class.getMethod("getStackTrace", null);
214:
215: StringWriter sw = new StringWriter(2048);
216: IndentedWriter iw = new IndentedWriter(sw);
217: for (int i = 0; i < initial_indent; ++i)
218: iw.upIndent();
219: for (Iterator ii = managed.iterator(); ii.hasNext();) {
220: Object poolThread = ii.next();
221: Object[] stackTraces = (Object[]) m.invoke(poolThread,
222: null);
223: iw.println(poolThread);
224: iw.upIndent();
225: for (int i = 0, len = stackTraces.length; i < len; ++i)
226: iw.println(stackTraces[i]);
227: iw.downIndent();
228: }
229: for (int i = 0; i < initial_indent; ++i)
230: iw.downIndent();
231: iw.flush(); // useless, but I feel better
232: String out = sw.toString();
233: iw.close(); // useless, but I feel better;
234: return out;
235: } catch (NoSuchMethodException e) {
236: if (logger.isLoggable(MLevel.FINE))
237: logger
238: .fine(this
239: + ": strack traces unavailable because this is a pre-Java 1.5 VM.");
240: return null;
241: } catch (Exception e) {
242: if (logger.isLoggable(MLevel.FINE))
243: logger
244: .log(
245: MLevel.FINE,
246: this
247: + ": An Exception occurred while trying to extract PoolThread stack traces.",
248: e);
249: return null;
250: }
251: }
252:
253: public synchronized String getMultiLineStatusString() {
254: return this .getMultiLineStatusString(0);
255: }
256:
257: // protected by ThreadPoolAsynchronousRunner.this' lock
258: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
259: private String getMultiLineStatusString(int initial_indent) {
260: try {
261: StringWriter sw = new StringWriter(2048);
262: IndentedWriter iw = new IndentedWriter(sw);
263:
264: for (int i = 0; i < initial_indent; ++i)
265: iw.upIndent();
266:
267: if (managed == null) {
268: iw.print("[");
269: iw.print(this );
270: iw.println(" closed.]");
271: } else {
272: HashSet active = (HashSet) managed.clone();
273: active.removeAll(available);
274:
275: iw.print("Managed Threads: ");
276: iw.println(managed.size());
277: iw.print("Active Threads: ");
278: iw.println(active.size());
279: iw.println("Active Tasks: ");
280: iw.upIndent();
281: for (Iterator ii = active.iterator(); ii.hasNext();) {
282: PoolThread pt = (PoolThread) ii.next();
283: iw.print(pt.getCurrentTask());
284: iw.print(" (");
285: iw.print(pt.getName());
286: iw.println(')');
287: }
288: iw.downIndent();
289: iw.println("Pending Tasks: ");
290: iw.upIndent();
291: for (int i = 0, len = pendingTasks.size(); i < len; ++i)
292: iw.println(pendingTasks.get(i));
293: iw.downIndent();
294: }
295:
296: for (int i = 0; i < initial_indent; ++i)
297: iw.downIndent();
298: iw.flush(); // useless, but I feel better
299: String out = sw.toString();
300: iw.close(); // useless, but I feel better;
301: return out;
302: } catch (IOException e) {
303: if (logger.isLoggable(MLevel.WARNING))
304: logger
305: .log(
306: MLevel.WARNING,
307: "Huh? An IOException when working with a StringWriter?!?",
308: e);
309: throw new RuntimeException(
310: "Huh? An IOException when working with a StringWriter?!? "
311: + e);
312: }
313: }
314:
315: // protected by ThreadPoolAsynchronousRunner.this' lock
316: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
317: private void appendStatusString(StringBuffer sb) {
318: if (managed == null)
319: sb.append("[closed]");
320: else {
321: HashSet active = (HashSet) managed.clone();
322: active.removeAll(available);
323: sb.append("[num_managed_threads: ");
324: sb.append(managed.size());
325: sb.append(", num_active: ");
326: sb.append(active.size());
327: sb.append("; activeTasks: ");
328: boolean first = true;
329: for (Iterator ii = active.iterator(); ii.hasNext();) {
330: if (first)
331: first = false;
332: else
333: sb.append(", ");
334: PoolThread pt = (PoolThread) ii.next();
335: sb.append(pt.getCurrentTask());
336: sb.append(" (");
337: sb.append(pt.getName());
338: sb.append(')');
339: }
340: sb.append("; pendingTasks: ");
341: for (int i = 0, len = pendingTasks.size(); i < len; ++i) {
342: if (i != 0)
343: sb.append(", ");
344: sb.append(pendingTasks.get(i));
345: }
346: sb.append(']');
347: }
348: }
349:
350: // protected by ThreadPoolAsynchronousRunner.this' lock
351: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock (or is ctor)
352: private void recreateThreadsAndTasks() {
353: if (this .managed != null) {
354: Date aboutNow = new Date();
355: for (Iterator ii = managed.iterator(); ii.hasNext();) {
356: PoolThread pt = (PoolThread) ii.next();
357: pt.gentleStop();
358: stoppedThreadsToStopDates.put(pt, aboutNow);
359: ensureReplacedThreadsProcessing();
360: }
361: }
362:
363: this .managed = new HashSet();
364: this .available = new HashSet();
365: this .pendingTasks = new LinkedList();
366: for (int i = 0; i < num_threads; ++i) {
367: Thread t = new PoolThread(i, daemon);
368: managed.add(t);
369: available.add(t);
370: t.start();
371: }
372: }
373:
374: // protected by ThreadPoolAsynchronousRunner.this' lock
375: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
376: private void processReplacedThreads() {
377: long about_now = System.currentTimeMillis();
378: for (Iterator ii = stoppedThreadsToStopDates.keySet()
379: .iterator(); ii.hasNext();) {
380: PoolThread pt = (PoolThread) ii.next();
381: if (!pt.isAlive())
382: ii.remove();
383: else {
384: Date d = (Date) stoppedThreadsToStopDates.get(pt);
385: if ((about_now - d.getTime()) > interrupt_delay_after_apparent_deadlock) {
386: if (logger.isLoggable(MLevel.WARNING))
387: logger
388: .log(
389: MLevel.WARNING,
390: "Task "
391: + pt.getCurrentTask()
392: + " (in deadlocked PoolThread) failed to complete in maximum time "
393: + interrupt_delay_after_apparent_deadlock
394: + "ms. Trying interrupt().");
395: pt.interrupt();
396: ii.remove();
397: }
398: //else keep waiting...
399: }
400: if (stoppedThreadsToStopDates.isEmpty())
401: stopReplacedThreadsProcessing();
402: }
403: }
404:
405: // protected by ThreadPoolAsynchronousRunner.this' lock
406: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
407: private void ensureReplacedThreadsProcessing() {
408: if (replacedThreadInterruptor == null) {
409: if (logger.isLoggable(MLevel.FINE))
410: logger
411: .fine("Apparently some threads have been replaced. Replacement thread processing enabled.");
412:
413: this .replacedThreadInterruptor = new ReplacedThreadInterruptor();
414: int replacedThreadProcessDelay = interrupt_delay_after_apparent_deadlock / 4;
415: myTimer.schedule(replacedThreadInterruptor,
416: replacedThreadProcessDelay,
417: replacedThreadProcessDelay);
418: }
419: }
420:
421: // protected by ThreadPoolAsynchronousRunner.this' lock
422: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
423: private void stopReplacedThreadsProcessing() {
424: if (this .replacedThreadInterruptor != null) {
425: this .replacedThreadInterruptor.cancel();
426: this .replacedThreadInterruptor = null;
427:
428: if (logger.isLoggable(MLevel.FINE))
429: logger
430: .fine("Apparently all replaced threads have either completed their tasks or been interrupted(). "
431: + "Replacement thread processing cancelled.");
432: }
433: }
434:
435: // protected by ThreadPoolAsynchronousRunner.this' lock
436: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
437: private void shuttingDown(PoolThread pt) {
438: if (managed != null && managed.contains(pt)) //we are not closed, and this was a thread in the current pool, not a replaced thread
439: {
440: managed.remove(pt);
441: available.remove(pt);
442: PoolThread replacement = new PoolThread(pt.getIndex(),
443: daemon);
444: managed.add(replacement);
445: available.add(replacement);
446: replacement.start();
447: }
448: }
449:
450: class PoolThread extends Thread {
451: // protected by ThreadPoolAsynchronousRunner.this' lock
452: Runnable currentTask;
453:
454: // protected by ThreadPoolAsynchronousRunner.this' lock
455: boolean should_stop;
456:
457: // post ctor immutable
458: int index;
459:
460: // not shared. only accessed by the PoolThread itself
461: TimerTask maxIndividualTaskTimeEnforcer = null;
462:
463: PoolThread(int index, boolean daemon) {
464: this .setName(this .getClass().getName() + "-#" + index);
465: this .setDaemon(daemon);
466: this .index = index;
467: }
468:
469: public int getIndex() {
470: return index;
471: }
472:
473: // protected by ThreadPoolAsynchronousRunner.this' lock
474: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
475: void gentleStop() {
476: should_stop = true;
477: }
478:
479: // protected by ThreadPoolAsynchronousRunner.this' lock
480: // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
481: Runnable getCurrentTask() {
482: return currentTask;
483: }
484:
485: // no need to sync. data not shared
486: private/* synchronized */void setMaxIndividualTaskTimeEnforcer() {
487: this .maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer(
488: this );
489: myTimer.schedule(maxIndividualTaskTimeEnforcer,
490: max_individual_task_time);
491: }
492:
493: // no need to sync. data not shared
494: private/* synchronized */void cancelMaxIndividualTaskTimeEnforcer() {
495: this .maxIndividualTaskTimeEnforcer.cancel();
496: this .maxIndividualTaskTimeEnforcer = null;
497: }
498:
499: public void run() {
500: try {
501: thread_loop: while (true) {
502: Runnable myTask;
503: synchronized (ThreadPoolAsynchronousRunner.this ) {
504: while (!should_stop && pendingTasks.size() == 0)
505: ThreadPoolAsynchronousRunner.this
506: .wait(POLL_FOR_STOP_INTERVAL);
507: if (should_stop)
508: break thread_loop;
509:
510: if (!available.remove(this ))
511: throw new InternalError(
512: "An unavailable PoolThread tried to check itself out!!!");
513: myTask = (Runnable) pendingTasks.remove(0);
514: currentTask = myTask;
515: }
516: try {
517: if (max_individual_task_time > 0)
518: setMaxIndividualTaskTimeEnforcer();
519: myTask.run();
520: } catch (RuntimeException e) {
521: if (logger.isLoggable(MLevel.WARNING))
522: logger
523: .log(
524: MLevel.WARNING,
525: this
526: + " -- caught unexpected Exception while executing posted task.",
527: e);
528: //e.printStackTrace();
529: } finally {
530: if (maxIndividualTaskTimeEnforcer != null)
531: cancelMaxIndividualTaskTimeEnforcer();
532:
533: synchronized (ThreadPoolAsynchronousRunner.this ) {
534: if (should_stop)
535: break thread_loop;
536:
537: if (available != null
538: && !available.add(this ))
539: throw new InternalError(
540: "An apparently available PoolThread tried to check itself in!!!");
541: currentTask = null;
542: }
543: }
544: }
545: } catch (InterruptedException exc) {
546: // if ( Debug.TRACE > Debug.TRACE_NONE )
547: // System.err.println(this + " interrupted. Shutting down.");
548:
549: if (Debug.TRACE > Debug.TRACE_NONE
550: && logger.isLoggable(MLevel.FINE))
551: logger.fine(this + " interrupted. Shutting down.");
552: }
553:
554: synchronized (ThreadPoolAsynchronousRunner.this ) {
555: ThreadPoolAsynchronousRunner.this .shuttingDown(this );
556: }
557: }
558: }
559:
560: class DeadlockDetector extends TimerTask {
561: LinkedList last = null;
562: LinkedList current = null;
563:
564: public void run() {
565: boolean run_stray_tasks = false;
566: synchronized (ThreadPoolAsynchronousRunner.this ) {
567: if (pendingTasks.size() == 0) {
568: last = null;
569: return;
570: }
571:
572: current = (LinkedList) pendingTasks.clone();
573: if (current.equals(last)) {
574: //System.err.println(this + " -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending tasks!");
575: if (logger.isLoggable(MLevel.WARNING)) {
576: logger
577: .warning(this
578: + " -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending tasks!");
579: StringWriter sw = new StringWriter(4096);
580: PrintWriter pw = new PrintWriter(sw);
581: //StringBuffer sb = new StringBuffer( 512 );
582: //appendStatusString( sb );
583: //System.err.println( sb.toString() );
584: pw.print(this );
585: pw
586: .println(" -- APPARENT DEADLOCK!!! Complete Status: ");
587: pw.print(ThreadPoolAsynchronousRunner.this
588: .getMultiLineStatusString(1));
589: pw.println("Pool thread stack traces:");
590: String stackTraces = getStackTraces(1);
591: if (stackTraces == null)
592: pw
593: .println("\t[Stack traces of deadlocked task threads not available.]");
594: else
595: pw.println(stackTraces);
596: pw.flush(); //superfluous, but I feel better
597: logger.warning(sw.toString());
598: pw.close(); //superfluous, but I feel better
599: }
600: recreateThreadsAndTasks();
601: run_stray_tasks = true;
602: }
603: }
604: if (run_stray_tasks) {
605: AsynchronousRunner ar = new ThreadPerTaskAsynchronousRunner(
606: DFLT_MAX_EMERGENCY_THREADS,
607: max_individual_task_time);
608: for (Iterator ii = current.iterator(); ii.hasNext();)
609: ar.postRunnable((Runnable) ii.next());
610: ar.close(false); //tell the emergency runner to close itself when its tasks are complete
611: last = null;
612: } else
613: last = current;
614:
615: // under some circumstances, these lists seem to hold onto a lot of memory... presumably this
616: // is when long pending task lists build up for some reason... nevertheless, let's dereference
617: // things as soon as possible. [Thanks to Venkatesh Seetharamaiah for calling attention to this
618: // issue, and for documenting the source of object retention.]
619: current = null;
620: }
621: }
622:
623: class MaxIndividualTaskTimeEnforcer extends TimerTask {
624: PoolThread pt;
625: Thread interruptMe;
626: String threadStr;
627: String fixedTaskStr;
628:
629: MaxIndividualTaskTimeEnforcer(PoolThread pt) {
630: this .pt = pt;
631: this .interruptMe = pt;
632: this .threadStr = pt.toString();
633: this .fixedTaskStr = null;
634: }
635:
636: MaxIndividualTaskTimeEnforcer(Thread interruptMe,
637: String threadStr, String fixedTaskStr) {
638: this .pt = null;
639: this .interruptMe = interruptMe;
640: this .threadStr = threadStr;
641: this .fixedTaskStr = fixedTaskStr;
642: }
643:
644: public void run() {
645: String taskStr;
646:
647: if (fixedTaskStr != null)
648: taskStr = fixedTaskStr;
649: else if (pt != null) {
650: synchronized (ThreadPoolAsynchronousRunner.this ) {
651: taskStr = String.valueOf(pt.getCurrentTask());
652: }
653: } else
654: taskStr = "Unknown task?!";
655:
656: if (logger.isLoggable(MLevel.WARNING))
657: logger
658: .warning("A task has exceeded the maximum allowable task time. Will interrupt() thread ["
659: + threadStr
660: + "], with current task: "
661: + taskStr);
662:
663: interruptMe.interrupt();
664:
665: if (logger.isLoggable(MLevel.WARNING))
666: logger.warning("Thread [" + threadStr
667: + "] interrupted.");
668: }
669: }
670:
671: //not currently used...
672: private void runInEmergencyThread(final Runnable r) {
673: final Thread t = new Thread(r);
674: t.start();
675: if (max_individual_task_time > 0) {
676: TimerTask maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer(
677: t, t + " [One-off emergency thread!!!]", r
678: .toString());
679: myTimer.schedule(maxIndividualTaskTimeEnforcer,
680: max_individual_task_time);
681: }
682: }
683:
684: class ReplacedThreadInterruptor extends TimerTask {
685: public void run() {
686: synchronized (ThreadPoolAsynchronousRunner.this) {
687: processReplacedThreads();
688: }
689: }
690: }
691: }
|