Source Code Cross Referenced for BoundedThreadPool.java in  » Sevlet-Container » jetty-modules » org » mortbay » thread » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Sevlet Container » jetty modules » org.mortbay.thread 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // ========================================================================
002:        // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
003:        // ------------------------------------------------------------------------
004:        // Licensed under the Apache License, Version 2.0 (the "License");
005:        // you may not use this file except in compliance with the License.
006:        // You may obtain a copy of the License at 
007:        // http://www.apache.org/licenses/LICENSE-2.0
008:        // Unless required by applicable law or agreed to in writing, software
009:        // distributed under the License is distributed on an "AS IS" BASIS,
010:        // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011:        // See the License for the specific language governing permissions and
012:        // limitations under the License.
013:        // ========================================================================
014:
015:        package org.mortbay.thread;
016:
017:        import java.io.Serializable;
018:        import java.util.ArrayList;
019:        import java.util.HashSet;
020:        import java.util.Iterator;
021:        import java.util.List;
022:        import java.util.Set;
023:
024:        import org.mortbay.component.AbstractLifeCycle;
025:        import org.mortbay.log.Log;
026:
027:        /* ------------------------------------------------------------ */
028:        /** A pool of threads.
029:         * <p>
030:         * Avoids the expense of thread creation by pooling threads after
031:         * their run methods exit for reuse.
032:         * <p>
033:         * If the maximum pool size is reached, jobs wait for a free thread.
034:         * By default there is no maximum pool size.  Idle threads timeout
035:         * and terminate until the minimum number of threads are running.
036:         * <p>
037:         * @author Greg Wilkins <gregw@mortbay.com>
038:         * @author Juancarlo Anez <juancarlo@modelistica.com>
039:         */
040:        public class BoundedThreadPool extends AbstractLifeCycle implements 
041:                Serializable, ThreadPool {
042:            private static int __id;
043:            private transient List _blocked;
044:
045:            private boolean _daemon;
046:
047:            private transient int _id;
048:
049:            private final String _lock = "LOCK";
050:            private final String _joinLock = "JOIN";
051:            private int _maxIdleTimeMs = 60000;
052:            private int _maxThreads = 255;
053:            private int _minThreads = 1;
054:            private int _lowThreads = 25;
055:            private String _name;
056:            int _priority = Thread.NORM_PRIORITY;
057:            private Set _threads;
058:            private List _idle;
059:            private boolean _warned = false;
060:            private long _lastShrink;
061:
062:            /* ------------------------------------------------------------------- */
063:            /* Construct
064:             */
065:            public BoundedThreadPool() {
066:                _name = "btpool" + __id++;
067:            }
068:
069:            /* ------------------------------------------------------------ */
070:            /** Get the number of idle threads in the pool.
071:             * @see #getThreads
072:             * @return Number of threads
073:             */
074:            public int getIdleThreads() {
075:                return _idle == null ? 0 : _idle.size();
076:            }
077:
078:            /* ------------------------------------------------------------ */
079:            /**
080:             * @return low resource threads threshhold
081:             */
082:            public int getLowThreads() {
083:                return _lowThreads;
084:            }
085:
086:            /* ------------------------------------------------------------ */
087:            /**
088:             * @param lowThreads low resource threads threshhold
089:             */
090:            public void setLowThreads(int lowThreads) {
091:                _lowThreads = lowThreads;
092:            }
093:
094:            /* ------------------------------------------------------------ */
095:            public boolean isLowOnThreads() {
096:                return _maxThreads - getThreads() + getIdleThreads() < _lowThreads;
097:            }
098:
099:            /* ------------------------------------------------------------ */
100:            /** Get the maximum thread idle time.
101:             * Delegated to the named or anonymous Pool.
102:             * @see #setMaxIdleTimeMs
103:             * @return Max idle time in ms.
104:             */
105:            public int getMaxIdleTimeMs() {
106:                return _maxIdleTimeMs;
107:            }
108:
109:            /* ------------------------------------------------------------ */
110:            /** Set the maximum number of threads.
111:             * Delegated to the named or anonymous Pool.
112:             * @see #setMaxThreads
113:             * @return maximum number of threads.
114:             */
115:            public int getMaxThreads() {
116:                return _maxThreads;
117:            }
118:
119:            /* ------------------------------------------------------------ */
120:            /** Get the minimum number of threads.
121:             * Delegated to the named or anonymous Pool.
122:             * @see #setMinThreads
123:             * @return minimum number of threads.
124:             */
125:            public int getMinThreads() {
126:                return _minThreads;
127:            }
128:
129:            /* ------------------------------------------------------------ */
130:            /** 
131:             * @return The name of the BoundedThreadPool.
132:             */
133:            public String getName() {
134:                return _name;
135:            }
136:
137:            /* ------------------------------------------------------------ */
138:            /** Get the number of threads in the pool.
139:             * @see #getIdleThreads
140:             * @return Number of threads
141:             */
142:            public int getThreads() {
143:                return _threads.size();
144:            }
145:
146:            /* ------------------------------------------------------------ */
147:            /** Get the priority of the pool threads.
148:             *  @return the priority of the pool threads.
149:             */
150:            public int getThreadsPriority() {
151:                return _priority;
152:            }
153:
154:            /* ------------------------------------------------------------ */
155:            /** 
156:             * Delegated to the named or anonymous Pool.
157:             */
158:            public boolean isDaemon() {
159:                return _daemon;
160:            }
161:
162:            /* ------------------------------------------------------------ */
163:            public void join() throws InterruptedException {
164:                synchronized (_joinLock) {
165:                    while (isRunning())
166:                        _joinLock.wait(getMaxIdleTimeMs());
167:                }
168:
169:                // TODO remove this semi busy loop!
170:                while (isStopping())
171:                    Thread.sleep(10);
172:            }
173:
174:            /* ------------------------------------------------------------ */
175:            protected void newThread() {
176:                synchronized (_lock) {
177:                    Thread thread = new PoolThread();
178:                    _threads.add(thread);
179:                    _idle.add(thread);
180:                    thread.setName(_name + "-" + _id++);
181:                    thread.start();
182:                }
183:            }
184:
185:            /* ------------------------------------------------------------ */
186:            /** Run job.
187:             * @return true if the job was given to a thread, false if no thread was
188:             * available.
189:             */
190:            public boolean dispatch(Runnable job) {
191:                boolean queued = false;
192:                synchronized (_lock) {
193:                    if (!isRunning())
194:                        return false;
195:
196:                    int blockMs = _maxIdleTimeMs;
197:
198:                    // Wait for an idle thread!
199:                    while (_idle.size() == 0) {
200:                        // Are we at max size?
201:                        if (_threads.size() < _maxThreads) {
202:                            // No
203:                            newThread();
204:                            break;
205:                        } else if (!_warned) {
206:                            _warned = true;
207:                            Log.debug("Out of threads for {}", this );
208:                        }
209:
210:                        // pool is full
211:                        if (blockMs < 0)
212:                            return false;
213:
214:                        // Block waiting
215:                        try {
216:                            _blocked.add(Thread.currentThread());
217:                            _lock.wait(blockMs);
218:                            blockMs = -1;
219:                        } catch (InterruptedException ie) {
220:                        } finally {
221:                            _blocked.remove(Thread.currentThread());
222:                        }
223:                    }
224:
225:                    PoolThread thread = (PoolThread) _idle
226:                            .remove(_idle.size() - 1);
227:                    thread.dispatch(job);
228:                    queued = true;
229:                }
230:
231:                if (_idle.size() == 0)
232:                    Thread.yield();
233:                return queued;
234:            }
235:
236:            /* ------------------------------------------------------------ */
237:            /** 
238:             * Delegated to the named or anonymous Pool.
239:             */
240:            public void setDaemon(boolean daemon) {
241:                _daemon = daemon;
242:            }
243:
244:            /* ------------------------------------------------------------ */
245:            /** Set the maximum thread idle time.
246:             * Threads that are idle for longer than this period may be
247:             * stopped.
248:             * Delegated to the named or anonymous Pool.
249:             * @see #getMaxIdleTimeMs
250:             * @param maxIdleTimeMs Max idle time in ms.
251:             */
252:            public void setMaxIdleTimeMs(int maxIdleTimeMs) {
253:                _maxIdleTimeMs = maxIdleTimeMs;
254:            }
255:
256:            /* ------------------------------------------------------------ */
257:            /** Set the maximum number of threads.
258:             * Delegated to the named or anonymous Pool.
259:             * @see #getMaxThreads
260:             * @param maxThreads maximum number of threads.
261:             */
262:            public void setMaxThreads(int maxThreads) {
263:                if (isStarted() && maxThreads < _minThreads)
264:                    throw new IllegalArgumentException("!minThreads<maxThreads");
265:                _maxThreads = maxThreads;
266:            }
267:
268:            /* ------------------------------------------------------------ */
269:            /** Set the minimum number of threads.
270:             * Delegated to the named or anonymous Pool.
271:             * @see #getMinThreads
272:             * @param minThreads minimum number of threads
273:             */
274:            public void setMinThreads(int minThreads) {
275:                if (isStarted()
276:                        && (minThreads <= 0 || minThreads > _maxThreads))
277:                    throw new IllegalArgumentException(
278:                            "!0<=minThreads<maxThreads");
279:                _minThreads = minThreads;
280:                synchronized (_lock) {
281:                    while (isStarted() && _threads.size() < _minThreads) {
282:                        newThread();
283:                    }
284:                }
285:            }
286:
287:            /* ------------------------------------------------------------ */
288:            /** 
289:             * @param name Name of the BoundedThreadPool to use when naming Threads.
290:             */
291:            public void setName(String name) {
292:                _name = name;
293:            }
294:
295:            /* ------------------------------------------------------------ */
296:            /** Set the priority of the pool threads.
297:             *  @param priority the new thread priority.
298:             */
299:            public void setThreadsPriority(int priority) {
300:                _priority = priority;
301:            }
302:
303:            /* ------------------------------------------------------------ */
304:            /* Start the BoundedThreadPool.
305:             * Construct the minimum number of threads.
306:             */
307:            protected void doStart() throws Exception {
308:                if (_maxThreads < _minThreads || _minThreads <= 0)
309:                    throw new IllegalArgumentException(
310:                            "!0<minThreads<maxThreads");
311:
312:                _threads = new HashSet();
313:                _idle = new ArrayList();
314:                _blocked = new ArrayList();
315:
316:                for (int i = 0; i < _minThreads; i++) {
317:                    newThread();
318:                }
319:            }
320:
321:            /* ------------------------------------------------------------ */
322:            /** Stop the BoundedThreadPool.
323:             * New jobs are no longer accepted,idle threads are interrupted
324:             * and stopJob is called on active threads.
325:             * The method then waits 
326:             * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
327:             * stop, at which time killJob is called.
328:             */
329:            protected void doStop() throws Exception {
330:                super .doStop();
331:
332:                for (int i = 0; i < 100; i++) {
333:                    synchronized (_lock) {
334:                        Iterator iter = _threads.iterator();
335:                        while (iter.hasNext())
336:                            ((Thread) iter.next()).interrupt();
337:                    }
338:
339:                    Thread.yield();
340:                    if (_threads.size() == 0)
341:                        break;
342:
343:                    try {
344:                        Thread.sleep(i * 100);
345:                    } catch (InterruptedException e) {
346:                    }
347:                }
348:
349:                // TODO perhaps force stops
350:                if (_threads.size() > 0)
351:                    Log.warn(_threads.size() + " threads could not be stopped");
352:
353:                synchronized (_joinLock) {
354:                    _joinLock.notifyAll();
355:                }
356:            }
357:
358:            /* ------------------------------------------------------------ */
359:            /** Stop a Job.
360:             * This method is called by the Pool if a job needs to be stopped.
361:             * The default implementation does nothing and should be extended by a
362:             * derived thread pool class if special action is required.
363:             * @param thread The thread allocated to the job, or null if no thread allocated.
364:             * @param job The job object passed to run.
365:             */
366:            protected void stopJob(Thread thread, Object job) {
367:                thread.interrupt();
368:            }
369:
370:            /* ------------------------------------------------------------ */
371:            /** Pool Thread class.
372:             * The PoolThread allows the threads job to be
373:             * retrieved and active status to be indicated.
374:             */
375:            public class PoolThread extends Thread {
376:                Runnable _job = null;
377:
378:                PoolThread() {
379:                    setDaemon(_daemon);
380:                    setPriority(_priority);
381:                }
382:
383:                void dispatch(Runnable job) {
384:                    synchronized (this ) {
385:                        if (_job != null || job == null)
386:                            throw new IllegalStateException();
387:                        _job = job;
388:                        this .notify();
389:                    }
390:                }
391:
392:                /* ------------------------------------------------------------ */
393:                /** BoundedThreadPool run.
394:                 * Loop getting jobs and handling them until idle or stopped.
395:                 */
396:                public void run() {
397:
398:                    Runnable job = null;
399:                    try {
400:                        while (isRunning()) {
401:                            try {
402:                                synchronized (_lock) {
403:                                    if (job != null) {
404:                                        _idle.add(this );
405:                                        if (_idle.size() >= _minThreads)
406:                                            _warned = false;
407:                                        if (_blocked.size() > 0)
408:                                            ((Thread) _blocked.get(0))
409:                                                    .interrupt();
410:                                    }
411:                                }
412:
413:                                synchronized (this ) {
414:                                    job = null;
415:                                    if (_job == null)
416:                                        this .wait(getMaxIdleTimeMs());
417:                                    job = _job;
418:                                    _job = null;
419:                                }
420:
421:                                if (isRunning() && job != null)
422:                                    job.run();
423:
424:                                synchronized (_lock) {
425:                                    if (job == null) {
426:                                        long now = System.currentTimeMillis();
427:                                        if (_threads.size() > _maxThreads
428:                                                || // we have too many threads  OR
429:                                                _idle.size() - _blocked.size() > 0
430:                                                && // are there idle threads?
431:                                                _threads.size() > _minThreads && // are there more than min threads?
432:                                                (now - _lastShrink) > getMaxIdleTimeMs()) // have we shrunk recently?
433:                                        {
434:                                            _lastShrink = now;
435:                                            _idle.remove(this );
436:                                            return;
437:                                        }
438:                                    }
439:                                }
440:                            } catch (InterruptedException e) {
441:                                Log.ignore(e);
442:                                return;
443:                            }
444:                        }
445:                    } finally {
446:                        synchronized (_lock) {
447:                            _idle.remove(this );
448:                            _threads.remove(this );
449:                        }
450:
451:                        synchronized (this ) {
452:                            job = null;
453:                            job = _job;
454:                        }
455:
456:                        // catch all!
457:                        if (job != null && isRunning())
458:                            BoundedThreadPool.this.dispatch(job);
459:                    }
460:                }
461:            }
462:
463:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.