Source Code Cross Referenced for ThreadPoolAsynchronousRunner.java in  » Database-JDBC-Connection-Pool » c3p0 » com » mchange » v2 » async » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » c3p0 » com.mchange.v2.async 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Distributed as part of c3p0 v.0.9.1.2
003:         *
004:         * Copyright (C) 2005 Machinery For Change, Inc.
005:         *
006:         * Author: Steve Waldman <swaldman@mchange.com>
007:         *
008:         * This library is free software; you can redistribute it and/or modify
009:         * it under the terms of the GNU Lesser General Public License version 2.1, as 
010:         * published by the Free Software Foundation.
011:         *
012:         * This software is distributed in the hope that it will be useful,
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
015:         * GNU Lesser General Public License for more details.
016:         *
017:         * You should have received a copy of the GNU Lesser General Public License
018:         * along with this software; see the file LICENSE.  If not, write to the
019:         * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
020:         * Boston, MA 02111-1307, USA.
021:         */
022:
023:        package com.mchange.v2.async;
024:
025:        import java.util.*;
026:        import com.mchange.v2.log.*;
027:
028:        import java.io.StringWriter;
029:        import java.io.PrintWriter;
030:        import java.io.IOException;
031:        import java.lang.reflect.Method;
032:        import com.mchange.v2.io.IndentedWriter;
033:        import com.mchange.v2.util.ResourceClosedException;
034:
035:        public final class ThreadPoolAsynchronousRunner implements 
036:                AsynchronousRunner {
037:            final static MLogger logger = MLog
038:                    .getLogger(ThreadPoolAsynchronousRunner.class);
039:
040:            final static int POLL_FOR_STOP_INTERVAL = 5000; //milliseconds
041:
042:            final static int DFLT_DEADLOCK_DETECTOR_INTERVAL = 10000; //milliseconds
043:            final static int DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK = 60000; //milliseconds
044:            final static int DFLT_MAX_INDIVIDUAL_TASK_TIME = 0; //milliseconds, <= 0 means don't enforce a max task time
045:
046:            final static int DFLT_MAX_EMERGENCY_THREADS = 10;
047:
048:            int deadlock_detector_interval;
049:            int interrupt_delay_after_apparent_deadlock;
050:            int max_individual_task_time;
051:
052:            int num_threads;
053:            boolean daemon;
054:            HashSet managed;
055:            HashSet available;
056:            LinkedList pendingTasks;
057:
058:            Timer myTimer;
059:            boolean should_cancel_timer;
060:
061:            TimerTask deadlockDetector = new DeadlockDetector();
062:            TimerTask replacedThreadInterruptor = null;
063:
064:            Map stoppedThreadsToStopDates = new HashMap();
065:
066:            private ThreadPoolAsynchronousRunner(int num_threads,
067:                    boolean daemon, int max_individual_task_time,
068:                    int deadlock_detector_interval,
069:                    int interrupt_delay_after_apparent_deadlock, Timer myTimer,
070:                    boolean should_cancel_timer) {
071:                this .num_threads = num_threads;
072:                this .daemon = daemon;
073:                this .max_individual_task_time = max_individual_task_time;
074:                this .deadlock_detector_interval = deadlock_detector_interval;
075:                this .interrupt_delay_after_apparent_deadlock = interrupt_delay_after_apparent_deadlock;
076:                this .myTimer = myTimer;
077:                this .should_cancel_timer = should_cancel_timer;
078:
079:                recreateThreadsAndTasks();
080:
081:                myTimer.schedule(deadlockDetector, deadlock_detector_interval,
082:                        deadlock_detector_interval);
083:
084:            }
085:
086:            public ThreadPoolAsynchronousRunner(int num_threads,
087:                    boolean daemon, int max_individual_task_time,
088:                    int deadlock_detector_interval,
089:                    int interrupt_delay_after_apparent_deadlock, Timer myTimer) {
090:                this (num_threads, daemon, max_individual_task_time,
091:                        deadlock_detector_interval,
092:                        interrupt_delay_after_apparent_deadlock, myTimer, false);
093:            }
094:
095:            public ThreadPoolAsynchronousRunner(int num_threads,
096:                    boolean daemon, int max_individual_task_time,
097:                    int deadlock_detector_interval,
098:                    int interrupt_delay_after_apparent_deadlock) {
099:                this (num_threads, daemon, max_individual_task_time,
100:                        deadlock_detector_interval,
101:                        interrupt_delay_after_apparent_deadlock,
102:                        new Timer(true), true);
103:            }
104:
105:            public ThreadPoolAsynchronousRunner(int num_threads,
106:                    boolean daemon, Timer sharedTimer) {
107:                this (num_threads, daemon, DFLT_MAX_INDIVIDUAL_TASK_TIME,
108:                        DFLT_DEADLOCK_DETECTOR_INTERVAL,
109:                        DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK,
110:                        sharedTimer, false);
111:            }
112:
113:            public ThreadPoolAsynchronousRunner(int num_threads, boolean daemon) {
114:                this (num_threads, daemon, DFLT_MAX_INDIVIDUAL_TASK_TIME,
115:                        DFLT_DEADLOCK_DETECTOR_INTERVAL,
116:                        DFLT_INTERRUPT_DELAY_AFTER_APPARENT_DEADLOCK,
117:                        new Timer(true), true);
118:            }
119:
120:            public synchronized void postRunnable(Runnable r) {
121:                try {
122:                    pendingTasks.add(r);
123:                    this .notifyAll();
124:                } catch (NullPointerException e) {
125:                    //e.printStackTrace();
126:                    if (Debug.DEBUG) {
127:                        if (logger.isLoggable(MLevel.FINE))
128:                            logger
129:                                    .log(
130:                                            MLevel.FINE,
131:                                            "NullPointerException while posting Runnable -- Probably we're closed.",
132:                                            e);
133:                    }
134:                    throw new ResourceClosedException(
135:                            "Attempted to use a ThreadPoolAsynchronousRunner in a closed or broken state.");
136:                }
137:            }
138:
139:            public synchronized int getThreadCount() {
140:                return managed.size();
141:            }
142:
143:            public void close(boolean skip_remaining_tasks) {
144:                synchronized (this ) {
145:                    if (managed == null)
146:                        return;
147:                    deadlockDetector.cancel();
148:                    //replacedThreadInterruptor.cancel();
149:                    if (should_cancel_timer)
150:                        myTimer.cancel();
151:                    myTimer = null;
152:                    for (Iterator ii = managed.iterator(); ii.hasNext();) {
153:                        PoolThread stopMe = (PoolThread) ii.next();
154:                        stopMe.gentleStop();
155:                        if (skip_remaining_tasks)
156:                            stopMe.interrupt();
157:                    }
158:                    managed = null;
159:
160:                    if (!skip_remaining_tasks) {
161:                        for (Iterator ii = pendingTasks.iterator(); ii
162:                                .hasNext();) {
163:                            Runnable r = (Runnable) ii.next();
164:                            new Thread(r).start();
165:                            ii.remove();
166:                        }
167:                    }
168:                    available = null;
169:                    pendingTasks = null;
170:                }
171:            }
172:
173:            public void close() {
174:                close(true);
175:            }
176:
177:            public synchronized int getActiveCount() {
178:                return managed.size() - available.size();
179:            }
180:
181:            public synchronized int getIdleCount() {
182:                return available.size();
183:            }
184:
185:            public synchronized int getPendingTaskCount() {
186:                return pendingTasks.size();
187:            }
188:
189:            public synchronized String getStatus() {
190:                /*
191:                StringBuffer sb = new StringBuffer( 512 );
192:                sb.append( this.toString() );
193:                sb.append( ' ' );
194:                appendStatusString( sb );
195:                return sb.toString();
196:                 */
197:
198:                return getMultiLineStatusString();
199:            }
200:
201:            // done reflectively for jdk 1.3/1.4 compatability
202:            public synchronized String getStackTraces() {
203:                return getStackTraces(0);
204:            }
205:
206:            // protected by ThreadPoolAsynchronousRunner.this' lock
207:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
208:            private String getStackTraces(int initial_indent) {
209:                if (managed == null)
210:                    return null;
211:
212:                try {
213:                    Method m = Thread.class.getMethod("getStackTrace", null);
214:
215:                    StringWriter sw = new StringWriter(2048);
216:                    IndentedWriter iw = new IndentedWriter(sw);
217:                    for (int i = 0; i < initial_indent; ++i)
218:                        iw.upIndent();
219:                    for (Iterator ii = managed.iterator(); ii.hasNext();) {
220:                        Object poolThread = ii.next();
221:                        Object[] stackTraces = (Object[]) m.invoke(poolThread,
222:                                null);
223:                        iw.println(poolThread);
224:                        iw.upIndent();
225:                        for (int i = 0, len = stackTraces.length; i < len; ++i)
226:                            iw.println(stackTraces[i]);
227:                        iw.downIndent();
228:                    }
229:                    for (int i = 0; i < initial_indent; ++i)
230:                        iw.downIndent();
231:                    iw.flush(); // useless, but I feel better
232:                    String out = sw.toString();
233:                    iw.close(); // useless, but I feel better;
234:                    return out;
235:                } catch (NoSuchMethodException e) {
236:                    if (logger.isLoggable(MLevel.FINE))
237:                        logger
238:                                .fine(this 
239:                                        + ": strack traces unavailable because this is a pre-Java 1.5 VM.");
240:                    return null;
241:                } catch (Exception e) {
242:                    if (logger.isLoggable(MLevel.FINE))
243:                        logger
244:                                .log(
245:                                        MLevel.FINE,
246:                                        this 
247:                                                + ": An Exception occurred while trying to extract PoolThread stack traces.",
248:                                        e);
249:                    return null;
250:                }
251:            }
252:
253:            public synchronized String getMultiLineStatusString() {
254:                return this .getMultiLineStatusString(0);
255:            }
256:
257:            // protected by ThreadPoolAsynchronousRunner.this' lock
258:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
259:            private String getMultiLineStatusString(int initial_indent) {
260:                try {
261:                    StringWriter sw = new StringWriter(2048);
262:                    IndentedWriter iw = new IndentedWriter(sw);
263:
264:                    for (int i = 0; i < initial_indent; ++i)
265:                        iw.upIndent();
266:
267:                    if (managed == null) {
268:                        iw.print("[");
269:                        iw.print(this );
270:                        iw.println(" closed.]");
271:                    } else {
272:                        HashSet active = (HashSet) managed.clone();
273:                        active.removeAll(available);
274:
275:                        iw.print("Managed Threads: ");
276:                        iw.println(managed.size());
277:                        iw.print("Active Threads: ");
278:                        iw.println(active.size());
279:                        iw.println("Active Tasks: ");
280:                        iw.upIndent();
281:                        for (Iterator ii = active.iterator(); ii.hasNext();) {
282:                            PoolThread pt = (PoolThread) ii.next();
283:                            iw.print(pt.getCurrentTask());
284:                            iw.print(" (");
285:                            iw.print(pt.getName());
286:                            iw.println(')');
287:                        }
288:                        iw.downIndent();
289:                        iw.println("Pending Tasks: ");
290:                        iw.upIndent();
291:                        for (int i = 0, len = pendingTasks.size(); i < len; ++i)
292:                            iw.println(pendingTasks.get(i));
293:                        iw.downIndent();
294:                    }
295:
296:                    for (int i = 0; i < initial_indent; ++i)
297:                        iw.downIndent();
298:                    iw.flush(); // useless, but I feel better
299:                    String out = sw.toString();
300:                    iw.close(); // useless, but I feel better;
301:                    return out;
302:                } catch (IOException e) {
303:                    if (logger.isLoggable(MLevel.WARNING))
304:                        logger
305:                                .log(
306:                                        MLevel.WARNING,
307:                                        "Huh? An IOException when working with a StringWriter?!?",
308:                                        e);
309:                    throw new RuntimeException(
310:                            "Huh? An IOException when working with a StringWriter?!? "
311:                                    + e);
312:                }
313:            }
314:
315:            // protected by ThreadPoolAsynchronousRunner.this' lock
316:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
317:            private void appendStatusString(StringBuffer sb) {
318:                if (managed == null)
319:                    sb.append("[closed]");
320:                else {
321:                    HashSet active = (HashSet) managed.clone();
322:                    active.removeAll(available);
323:                    sb.append("[num_managed_threads: ");
324:                    sb.append(managed.size());
325:                    sb.append(", num_active: ");
326:                    sb.append(active.size());
327:                    sb.append("; activeTasks: ");
328:                    boolean first = true;
329:                    for (Iterator ii = active.iterator(); ii.hasNext();) {
330:                        if (first)
331:                            first = false;
332:                        else
333:                            sb.append(", ");
334:                        PoolThread pt = (PoolThread) ii.next();
335:                        sb.append(pt.getCurrentTask());
336:                        sb.append(" (");
337:                        sb.append(pt.getName());
338:                        sb.append(')');
339:                    }
340:                    sb.append("; pendingTasks: ");
341:                    for (int i = 0, len = pendingTasks.size(); i < len; ++i) {
342:                        if (i != 0)
343:                            sb.append(", ");
344:                        sb.append(pendingTasks.get(i));
345:                    }
346:                    sb.append(']');
347:                }
348:            }
349:
350:            // protected by ThreadPoolAsynchronousRunner.this' lock
351:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock (or is ctor)
352:            private void recreateThreadsAndTasks() {
353:                if (this .managed != null) {
354:                    Date aboutNow = new Date();
355:                    for (Iterator ii = managed.iterator(); ii.hasNext();) {
356:                        PoolThread pt = (PoolThread) ii.next();
357:                        pt.gentleStop();
358:                        stoppedThreadsToStopDates.put(pt, aboutNow);
359:                        ensureReplacedThreadsProcessing();
360:                    }
361:                }
362:
363:                this .managed = new HashSet();
364:                this .available = new HashSet();
365:                this .pendingTasks = new LinkedList();
366:                for (int i = 0; i < num_threads; ++i) {
367:                    Thread t = new PoolThread(i, daemon);
368:                    managed.add(t);
369:                    available.add(t);
370:                    t.start();
371:                }
372:            }
373:
374:            // protected by ThreadPoolAsynchronousRunner.this' lock
375:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
376:            private void processReplacedThreads() {
377:                long about_now = System.currentTimeMillis();
378:                for (Iterator ii = stoppedThreadsToStopDates.keySet()
379:                        .iterator(); ii.hasNext();) {
380:                    PoolThread pt = (PoolThread) ii.next();
381:                    if (!pt.isAlive())
382:                        ii.remove();
383:                    else {
384:                        Date d = (Date) stoppedThreadsToStopDates.get(pt);
385:                        if ((about_now - d.getTime()) > interrupt_delay_after_apparent_deadlock) {
386:                            if (logger.isLoggable(MLevel.WARNING))
387:                                logger
388:                                        .log(
389:                                                MLevel.WARNING,
390:                                                "Task "
391:                                                        + pt.getCurrentTask()
392:                                                        + " (in deadlocked PoolThread) failed to complete in maximum time "
393:                                                        + interrupt_delay_after_apparent_deadlock
394:                                                        + "ms. Trying interrupt().");
395:                            pt.interrupt();
396:                            ii.remove();
397:                        }
398:                        //else keep waiting...
399:                    }
400:                    if (stoppedThreadsToStopDates.isEmpty())
401:                        stopReplacedThreadsProcessing();
402:                }
403:            }
404:
405:            // protected by ThreadPoolAsynchronousRunner.this' lock
406:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
407:            private void ensureReplacedThreadsProcessing() {
408:                if (replacedThreadInterruptor == null) {
409:                    if (logger.isLoggable(MLevel.FINE))
410:                        logger
411:                                .fine("Apparently some threads have been replaced. Replacement thread processing enabled.");
412:
413:                    this .replacedThreadInterruptor = new ReplacedThreadInterruptor();
414:                    int replacedThreadProcessDelay = interrupt_delay_after_apparent_deadlock / 4;
415:                    myTimer.schedule(replacedThreadInterruptor,
416:                            replacedThreadProcessDelay,
417:                            replacedThreadProcessDelay);
418:                }
419:            }
420:
421:            // protected by ThreadPoolAsynchronousRunner.this' lock
422:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
423:            private void stopReplacedThreadsProcessing() {
424:                if (this .replacedThreadInterruptor != null) {
425:                    this .replacedThreadInterruptor.cancel();
426:                    this .replacedThreadInterruptor = null;
427:
428:                    if (logger.isLoggable(MLevel.FINE))
429:                        logger
430:                                .fine("Apparently all replaced threads have either completed their tasks or been interrupted(). "
431:                                        + "Replacement thread processing cancelled.");
432:                }
433:            }
434:
435:            // protected by ThreadPoolAsynchronousRunner.this' lock
436:            // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
437:            private void shuttingDown(PoolThread pt) {
438:                if (managed != null && managed.contains(pt)) //we are not closed, and this was a thread in the current pool, not a replaced thread
439:                {
440:                    managed.remove(pt);
441:                    available.remove(pt);
442:                    PoolThread replacement = new PoolThread(pt.getIndex(),
443:                            daemon);
444:                    managed.add(replacement);
445:                    available.add(replacement);
446:                    replacement.start();
447:                }
448:            }
449:
450:            class PoolThread extends Thread {
451:                // protected by ThreadPoolAsynchronousRunner.this' lock
452:                Runnable currentTask;
453:
454:                // protected by ThreadPoolAsynchronousRunner.this' lock
455:                boolean should_stop;
456:
457:                // post ctor immutable
458:                int index;
459:
460:                // not shared. only accessed by the PoolThread itself
461:                TimerTask maxIndividualTaskTimeEnforcer = null;
462:
463:                PoolThread(int index, boolean daemon) {
464:                    this .setName(this .getClass().getName() + "-#" + index);
465:                    this .setDaemon(daemon);
466:                    this .index = index;
467:                }
468:
469:                public int getIndex() {
470:                    return index;
471:                }
472:
473:                // protected by ThreadPoolAsynchronousRunner.this' lock
474:                // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
475:                void gentleStop() {
476:                    should_stop = true;
477:                }
478:
479:                // protected by ThreadPoolAsynchronousRunner.this' lock
480:                // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
481:                Runnable getCurrentTask() {
482:                    return currentTask;
483:                }
484:
485:                // no need to sync. data not shared
486:                private/* synchronized */void setMaxIndividualTaskTimeEnforcer() {
487:                    this .maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer(
488:                            this );
489:                    myTimer.schedule(maxIndividualTaskTimeEnforcer,
490:                            max_individual_task_time);
491:                }
492:
493:                // no need to sync. data not shared
494:                private/* synchronized */void cancelMaxIndividualTaskTimeEnforcer() {
495:                    this .maxIndividualTaskTimeEnforcer.cancel();
496:                    this .maxIndividualTaskTimeEnforcer = null;
497:                }
498:
499:                public void run() {
500:                    try {
501:                        thread_loop: while (true) {
502:                            Runnable myTask;
503:                            synchronized (ThreadPoolAsynchronousRunner.this ) {
504:                                while (!should_stop && pendingTasks.size() == 0)
505:                                    ThreadPoolAsynchronousRunner.this 
506:                                            .wait(POLL_FOR_STOP_INTERVAL);
507:                                if (should_stop)
508:                                    break thread_loop;
509:
510:                                if (!available.remove(this ))
511:                                    throw new InternalError(
512:                                            "An unavailable PoolThread tried to check itself out!!!");
513:                                myTask = (Runnable) pendingTasks.remove(0);
514:                                currentTask = myTask;
515:                            }
516:                            try {
517:                                if (max_individual_task_time > 0)
518:                                    setMaxIndividualTaskTimeEnforcer();
519:                                myTask.run();
520:                            } catch (RuntimeException e) {
521:                                if (logger.isLoggable(MLevel.WARNING))
522:                                    logger
523:                                            .log(
524:                                                    MLevel.WARNING,
525:                                                    this 
526:                                                            + " -- caught unexpected Exception while executing posted task.",
527:                                                    e);
528:                                //e.printStackTrace();
529:                            } finally {
530:                                if (maxIndividualTaskTimeEnforcer != null)
531:                                    cancelMaxIndividualTaskTimeEnforcer();
532:
533:                                synchronized (ThreadPoolAsynchronousRunner.this ) {
534:                                    if (should_stop)
535:                                        break thread_loop;
536:
537:                                    if (available != null
538:                                            && !available.add(this ))
539:                                        throw new InternalError(
540:                                                "An apparently available PoolThread tried to check itself in!!!");
541:                                    currentTask = null;
542:                                }
543:                            }
544:                        }
545:                    } catch (InterruptedException exc) {
546:                        //              if ( Debug.TRACE > Debug.TRACE_NONE )
547:                        //              System.err.println(this + " interrupted. Shutting down.");
548:
549:                        if (Debug.TRACE > Debug.TRACE_NONE
550:                                && logger.isLoggable(MLevel.FINE))
551:                            logger.fine(this  + " interrupted. Shutting down.");
552:                    }
553:
554:                    synchronized (ThreadPoolAsynchronousRunner.this ) {
555:                        ThreadPoolAsynchronousRunner.this .shuttingDown(this );
556:                    }
557:                }
558:            }
559:
560:            class DeadlockDetector extends TimerTask {
561:                LinkedList last = null;
562:                LinkedList current = null;
563:
564:                public void run() {
565:                    boolean run_stray_tasks = false;
566:                    synchronized (ThreadPoolAsynchronousRunner.this ) {
567:                        if (pendingTasks.size() == 0) {
568:                            last = null;
569:                            return;
570:                        }
571:
572:                        current = (LinkedList) pendingTasks.clone();
573:                        if (current.equals(last)) {
574:                            //System.err.println(this + " -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending tasks!");
575:                            if (logger.isLoggable(MLevel.WARNING)) {
576:                                logger
577:                                        .warning(this 
578:                                                + " -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending tasks!");
579:                                StringWriter sw = new StringWriter(4096);
580:                                PrintWriter pw = new PrintWriter(sw);
581:                                //StringBuffer sb = new StringBuffer( 512 );
582:                                //appendStatusString( sb );
583:                                //System.err.println( sb.toString() );
584:                                pw.print(this );
585:                                pw
586:                                        .println(" -- APPARENT DEADLOCK!!! Complete Status: ");
587:                                pw.print(ThreadPoolAsynchronousRunner.this 
588:                                        .getMultiLineStatusString(1));
589:                                pw.println("Pool thread stack traces:");
590:                                String stackTraces = getStackTraces(1);
591:                                if (stackTraces == null)
592:                                    pw
593:                                            .println("\t[Stack traces of deadlocked task threads not available.]");
594:                                else
595:                                    pw.println(stackTraces);
596:                                pw.flush(); //superfluous, but I feel better
597:                                logger.warning(sw.toString());
598:                                pw.close(); //superfluous, but I feel better
599:                            }
600:                            recreateThreadsAndTasks();
601:                            run_stray_tasks = true;
602:                        }
603:                    }
604:                    if (run_stray_tasks) {
605:                        AsynchronousRunner ar = new ThreadPerTaskAsynchronousRunner(
606:                                DFLT_MAX_EMERGENCY_THREADS,
607:                                max_individual_task_time);
608:                        for (Iterator ii = current.iterator(); ii.hasNext();)
609:                            ar.postRunnable((Runnable) ii.next());
610:                        ar.close(false); //tell the emergency runner to close itself when its tasks are complete
611:                        last = null;
612:                    } else
613:                        last = current;
614:
615:                    // under some circumstances, these lists seem to hold onto a lot of memory... presumably this
616:                    // is when long pending task lists build up for some reason... nevertheless, let's dereference
617:                    // things as soon as possible. [Thanks to Venkatesh Seetharamaiah for calling attention to this
618:                    // issue, and for documenting the source of object retention.]
619:                    current = null;
620:                }
621:            }
622:
623:            class MaxIndividualTaskTimeEnforcer extends TimerTask {
624:                PoolThread pt;
625:                Thread interruptMe;
626:                String threadStr;
627:                String fixedTaskStr;
628:
629:                MaxIndividualTaskTimeEnforcer(PoolThread pt) {
630:                    this .pt = pt;
631:                    this .interruptMe = pt;
632:                    this .threadStr = pt.toString();
633:                    this .fixedTaskStr = null;
634:                }
635:
636:                MaxIndividualTaskTimeEnforcer(Thread interruptMe,
637:                        String threadStr, String fixedTaskStr) {
638:                    this .pt = null;
639:                    this .interruptMe = interruptMe;
640:                    this .threadStr = threadStr;
641:                    this .fixedTaskStr = fixedTaskStr;
642:                }
643:
644:                public void run() {
645:                    String taskStr;
646:
647:                    if (fixedTaskStr != null)
648:                        taskStr = fixedTaskStr;
649:                    else if (pt != null) {
650:                        synchronized (ThreadPoolAsynchronousRunner.this ) {
651:                            taskStr = String.valueOf(pt.getCurrentTask());
652:                        }
653:                    } else
654:                        taskStr = "Unknown task?!";
655:
656:                    if (logger.isLoggable(MLevel.WARNING))
657:                        logger
658:                                .warning("A task has exceeded the maximum allowable task time. Will interrupt() thread ["
659:                                        + threadStr
660:                                        + "], with current task: "
661:                                        + taskStr);
662:
663:                    interruptMe.interrupt();
664:
665:                    if (logger.isLoggable(MLevel.WARNING))
666:                        logger.warning("Thread [" + threadStr
667:                                + "] interrupted.");
668:                }
669:            }
670:
671:            //not currently used...
672:            private void runInEmergencyThread(final Runnable r) {
673:                final Thread t = new Thread(r);
674:                t.start();
675:                if (max_individual_task_time > 0) {
676:                    TimerTask maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer(
677:                            t, t + " [One-off emergency thread!!!]", r
678:                                    .toString());
679:                    myTimer.schedule(maxIndividualTaskTimeEnforcer,
680:                            max_individual_task_time);
681:                }
682:            }
683:
684:            class ReplacedThreadInterruptor extends TimerTask {
685:                public void run() {
686:                    synchronized (ThreadPoolAsynchronousRunner.this) {
687:                        processReplacedThreads();
688:                    }
689:                }
690:            }
691:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.