Source Code Cross Referenced for ReusableThread.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » util » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.util 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: ReusableThread.java,v 1.7.16.1 2007/03/08 10:22:05 belaban Exp $
002:
003:        package org.jgroups.util;
004:
005:        import org.apache.commons.logging.Log;
006:        import org.apache.commons.logging.LogFactory;
007:
008:        /**
009:         * Reusable thread class. Instead of creating a new thread per task, this instance can be reused
010:         * to run different tasks in turn. This is done by looping and assigning the Runnable task objects
011:         * whose <code>run</code> method is then called.<br>
012:         * Tasks are Runnable objects and should be prepared to terminate when they receive an
013:         * InterruptedException. This is thrown by the stop() method.<br>
014:         * <p/>
015:         * The following situations have to be tested:
016:         * <ol>
017:         * <li>ReusableThread is started. Then, brefore assigning a task, it is stopped again
018:         * <li>ReusableThread is started, assigned a long running task. Then, before task is done,
019:         * stop() is called
020:         * <li>ReusableThread is started, assigned a task. Then waitUntilDone() is called, then stop()
021:         * <li>ReusableThread is started, assigned a number of tasks (waitUntilDone() called between tasks),
022:         * then stopped
023:         * <li>ReusableThread is started, assigned a task
024:         * </ol>
025:         *
026:         * @author Bela Ban
027:         */
028:        public class ReusableThread implements  Runnable {
029:            volatile Thread thread = null; // thread that works on the task
030:            Runnable task = null; // task assigned to thread
031:            private ThreadLocalListener tl_listener = null;
032:            String thread_name = "ReusableThread";
033:            volatile boolean suspended = false;
034:            protected static final Log log = LogFactory
035:                    .getLog(ReusableThread.class);
036:            static final long TASK_JOIN_TIME = 3000; // wait 3 secs for an interrupted thread to terminate
037:
038:            public ReusableThread() {
039:            }
040:
041:            public ReusableThread(String thread_name) {
042:                this .thread_name = thread_name;
043:            }
044:
045:            public boolean done() {
046:                return task == null;
047:            }
048:
049:            public boolean available() {
050:                return done();
051:            }
052:
053:            public boolean isAlive() {
054:                synchronized (this ) {
055:                    return thread != null && thread.isAlive();
056:                }
057:            }
058:
059:            /**
060:             * Will always be called from synchronized method, no need to do our own synchronization
061:             */
062:            public void start() {
063:                if (thread == null || (thread != null && !thread.isAlive())) {
064:                    thread = new Thread(this , thread_name);
065:                    thread.setDaemon(true);
066:                    thread.start();
067:                }
068:            }
069:
070:            /**
071:             * Stops the thread by setting thread=null and interrupting it. The run() method catches the
072:             * InterruptedException and checks whether thread==null. If this is the case, it will terminate
073:             */
074:            public void stop() {
075:                Thread tmp = null;
076:
077:                if (log.isTraceEnabled())
078:                    log.trace("entering THIS");
079:                synchronized (this ) {
080:                    if (log.isTraceEnabled())
081:                        log.trace("entered THIS (thread=" + printObj(thread)
082:                                + ", task=" + printObj(task) + ", suspended="
083:                                + suspended + ')');
084:                    if (thread != null && thread.isAlive()) {
085:                        tmp = thread;
086:                        thread = null; // signals the thread to stop
087:                        task = null;
088:                        if (log.isTraceEnabled())
089:                            log.trace("notifying thread");
090:                        notifyAll();
091:                        if (log.isTraceEnabled())
092:                            log.trace("notifying thread completed");
093:                    }
094:                    thread = null;
095:                    task = null;
096:                }
097:
098:                if (tmp != null && tmp.isAlive()) {
099:                    long s1 = System.currentTimeMillis(), s2 = 0;
100:                    if (log.isTraceEnabled())
101:                        log.trace("join(" + TASK_JOIN_TIME + ')');
102:
103:                    tmp.interrupt();
104:
105:                    try {
106:                        tmp.join(TASK_JOIN_TIME);
107:                    } catch (Exception e) {
108:                    }
109:                    s2 = System.currentTimeMillis();
110:                    if (log.isTraceEnabled())
111:                        log.trace("join(" + TASK_JOIN_TIME + ") completed in "
112:                                + (s2 - s1));
113:                    if (tmp.isAlive())
114:                        if (log.isErrorEnabled())
115:                            log.error("thread is still alive");
116:                    tmp = null;
117:                }
118:            }
119:
120:            /**
121:             * Suspends the thread. Does nothing if already suspended. If a thread is waiting to be assigned a task, or
122:             * is currently running a (possibly long-running) task, then it will be suspended the next time it
123:             * waits for suspended==false (second wait-loop in run())
124:             */
125:
126:            public void suspend() {
127:                synchronized (this ) {
128:                    if (log.isTraceEnabled())
129:                        log.trace("suspended=" + suspended + ", task="
130:                                + printObj(task));
131:                    if (!suspended) {
132:                        suspended = true;
133:                    }
134:                }
135:            }
136:
137:            /**
138:             * Resumes the thread. Noop if not suspended
139:             */
140:            public void resume() {
141:                synchronized (this ) {
142:                    suspended = false;
143:                    notifyAll(); // notifies run(): the wait on suspend() is released
144:                }
145:            }
146:
147:            /**
148:             * Assigns a task to the thread. If the thread is not running, it will be started. It it is
149:             * already working on a task, it will reject the new task. Returns true if task could be
150:             * assigned auccessfully
151:             */
152:            public boolean assignTask(Runnable t) {
153:                synchronized (this ) {
154:                    start(); // creates and starts the thread if not yet running
155:                    if (task == null) {
156:                        task = t;
157:                        notifyAll(); // signals run() to start working (first wait-loop)
158:                        return true;
159:                    } else {
160:                        if (log.isErrorEnabled())
161:                            log
162:                                    .error("already working on a thread: current_task="
163:                                            + task
164:                                            + ", new task="
165:                                            + t
166:                                            + ", thread="
167:                                            + thread
168:                                            + ", is alive="
169:                                            + (thread != null ? String
170:                                                    .valueOf(thread.isAlive())
171:                                                    : "null"));
172:                        return false;
173:                    }
174:                }
175:            }
176:
177:            /**
178:             * Assigns a ThreadLocalListener to the current ReusableThread. The ThreadLocalListener
179:             * sets ThreadLocal's values for the lifetime of the task for the thread that is used
180:             * to run the task. The ThreadLocalListener will reset values upon the task returning
181:             * and at this point will be deleted.
182:             */
183:            public void assignThreadLocalListener(
184:                    ThreadLocalListener tl_listener) {
185:                this .tl_listener = tl_listener;
186:            }
187:
188:            /**
189:             * Delicate piece of code (means very important :-)). Works as follows: loops until stop is true.
190:             * Waits in a loop until task is assigned. Then runs the task and notifies waiters that it's done
191:             * when task is completed. Then returns to the first loop to wait for more work. Does so until
192:             * stop() is called, which sets stop=true and interrupts the thread. If waiting for a task, the
193:             * thread terminates. If running a task, the task is interrupted, and the thread terminates. If the
194:             * task is not interrupible, the stop() method will wait for 3 secs (join on the thread), then return.
195:             * This means that the run() method of the task will complete and only then will the thread be
196:             * garbage-collected.
197:             */
198:            public void run() {
199:                while (thread != null) { // Stop sets thread=null
200:                    try {
201:                        if (log.isTraceEnabled())
202:                            log.trace("entering ASSIGN");
203:                        synchronized (this ) {
204:                            if (log.isTraceEnabled())
205:                                log.trace("entered ASSIGN (task="
206:                                        + printObj(task) + ", thread="
207:                                        + printObj(thread) + ')');
208:
209:                            while (task == null && thread != null) { // first wait-loop: wait for task to be assigned (assignTask())
210:                                if (log.isTraceEnabled())
211:                                    log.trace("wait ASSIGN");
212:                                wait();
213:                                if (log.isTraceEnabled())
214:                                    log.trace("wait ASSIGN completed");
215:                            }
216:                        }
217:                    } catch (InterruptedException ex) { // on assignTask()
218:                        if (log.isTraceEnabled())
219:                            log.trace("interrupt on ASSIGN");
220:                    }
221:                    if (thread == null)
222:                        return; // we need to terminate
223:
224:                    try {
225:                        if (log.isTraceEnabled())
226:                            log.trace("entering SUSPEND");
227:                        synchronized (this ) {
228:                            if (log.isTraceEnabled())
229:                                log.trace("entered SUSPEND (suspended="
230:                                        + suspended + ", task="
231:                                        + printObj(task) + ')');
232:                            while (suspended && thread != null) { // second wait-loop: wait for thread to resume (resume())
233:                                if (log.isTraceEnabled())
234:                                    log.trace("wait SUSPEND");
235:                                wait();
236:                                if (log.isTraceEnabled())
237:                                    log.trace("wait SUSPEND completed");
238:                            }
239:                        }
240:                    } catch (InterruptedException ex) { // on resume()
241:                        if (log.isTraceEnabled())
242:                            log.trace("interrupt on RESUME");
243:                    }
244:                    if (thread == null)
245:                        return; // we need to terminate
246:
247:                    if (task != null) {
248:                        if (log.isTraceEnabled())
249:                            log.trace("setting ThreadLocal(s)");
250:                        if (tl_listener != null)
251:                            tl_listener.setThreadLocal();
252:                        if (log.isTraceEnabled())
253:                            log.trace("running task");
254:                        try {
255:                            task.run(); //here we are actually running the task
256:                        } catch (Throwable ex) {
257:                            if (log.isErrorEnabled())
258:                                log.error("failed running task", ex);
259:                        } finally {
260:                            if (log.isTraceEnabled())
261:                                log.trace("resetting ThreadLocal(s)");
262:                            if (tl_listener != null)
263:                                tl_listener.resetThreadLocal();
264:                            tl_listener = null;
265:                        }
266:                        if (log.isTraceEnabled())
267:                            log.trace("task completed");
268:                    }
269:
270:                    if (log.isTraceEnabled())
271:                        log.trace("entering THIS");
272:                    synchronized (this ) {
273:                        if (log.isTraceEnabled())
274:                            log.trace("entered THIS");
275:                        task = null;
276:                        if (log.isTraceEnabled())
277:                            log.trace("notify THIS");
278:                        notifyAll();
279:                        if (log.isTraceEnabled())
280:                            log.trace("notify THIS completed");
281:                    }
282:                }
283:                if (log.isTraceEnabled())
284:                    log.trace("terminated");
285:            }
286:
287:            String printObj(Object obj) {
288:                if (obj == null)
289:                    return "null";
290:                else
291:                    return "non-null";
292:            }
293:
294:            public void waitUntilDone() {
295:
296:                if (log.isTraceEnabled())
297:                    log.trace("entering THIS");
298:                synchronized (this ) {
299:                    if (log.isTraceEnabled())
300:                        log.trace("entered THIS (task=" + printObj(task) + ')');
301:                    while (task != null) {
302:                        try {
303:                            if (log.isTraceEnabled())
304:                                log.trace("wait THIS");
305:                            wait();
306:                            if (log.isTraceEnabled())
307:                                log.trace("wait THIS completed");
308:                        } catch (InterruptedException interrupted) {
309:                        }
310:                    }
311:                }
312:            }
313:
314:            public String toString() {
315:                return "suspended=" + suspended;
316:            }
317:
318:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.