Source Code Cross Referenced for Parallel.java in  » Build » ANT » org » apache » tools » ant » taskdefs » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Build » ANT » org.apache.tools.ant.taskdefs 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         *  Licensed to the Apache Software Foundation (ASF) under one or more
003:         *  contributor license agreements.  See the NOTICE file distributed with
004:         *  this work for additional information regarding copyright ownership.
005:         *  The ASF licenses this file to You under the Apache License, Version 2.0
006:         *  (the "License"); you may not use this file except in compliance with
007:         *  the License.  You may obtain a copy of the License at
008:         *
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         *
011:         *  Unless required by applicable law or agreed to in writing, software
012:         *  distributed under the License is distributed on an "AS IS" BASIS,
013:         *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         *  See the License for the specific language governing permissions and
015:         *  limitations under the License.
016:         *
017:         */
018:        package org.apache.tools.ant.taskdefs;
019:
020:        import java.lang.reflect.Method;
021:        import java.util.Enumeration;
022:        import java.util.Vector;
023:        import java.util.List;
024:        import java.util.ArrayList;
025:        import org.apache.tools.ant.BuildException;
026:        import org.apache.tools.ant.Location;
027:        import org.apache.tools.ant.Task;
028:        import org.apache.tools.ant.TaskContainer;
029:        import org.apache.tools.ant.util.StringUtils;
030:
031:        /**
032:         * Executes the contained tasks in separate threads, continuing
033:         * once all are completed.
034:         * <p>
035:         * New behavior allows for the ant script to specify a maximum number of
036:         * threads that will be executed in parallel.  One should be very careful about
037:         * using the <code>waitFor</code> task when specifying <code>threadCount</code>
038:         * as it can cause deadlocks if the number of threads is too small or if one of
039:         * the nested tasks fails to execute completely.  The task selection algorithm
040:         * will insure that the tasks listed before a task have started before that
041:         * task is started, but it will not insure a successful completion of those
042:         * tasks or that those tasks will finish first (i.e. it's a classic race
043:         * condition).
044:         * </p>
045:         * @since Ant 1.4
046:         *
047:         * @ant.task category="control"
048:         */
049:        public class Parallel extends Task implements  TaskContainer {
050:
051:            /** Class which holds a list of tasks to execute */
052:            public static class TaskList implements  TaskContainer {
053:                /** Collection holding the nested tasks */
054:                private List tasks = new ArrayList();
055:
056:                /**
057:                 * Add a nested task to execute parallel (asynchron).
058:                 * <p>
059:                 * @param nestedTask  Nested task to be executed in parallel.
060:                 *                    must not be null.
061:                 */
062:                public void addTask(Task nestedTask) {
063:                    tasks.add(nestedTask);
064:                }
065:            }
066:
067:            /** Collection holding the nested tasks */
068:            private Vector nestedTasks = new Vector();
069:
070:            /** Semaphore to notify of completed threads */
071:            private final Object semaphore = new Object();
072:
073:            /** Total number of threads to run */
074:            private int numThreads = 0;
075:
076:            /** Total number of threads per processor to run.  */
077:            private int numThreadsPerProcessor = 0;
078:
079:            /** The timeout period in milliseconds */
080:            private long timeout;
081:
082:            /** Indicates threads are still running and new threads can be issued */
083:            private volatile boolean stillRunning;
084:
085:            /** Indicates that the execution timedout */
086:            private boolean timedOut;
087:
088:            /**
089:             * Indicates whether failure of any of the nested tasks should end
090:             * execution
091:             */
092:            private boolean failOnAny;
093:
094:            /** The dameon task list if any */
095:            private TaskList daemonTasks;
096:
097:            /** Accumulation of exceptions messages from all nested tasks */
098:            private StringBuffer exceptionMessage;
099:
100:            /** Number of exceptions from nested tasks */
101:            private int numExceptions = 0;
102:
103:            /** The first exception encountered */
104:            private Throwable firstException;
105:
106:            /** The location of the first exception */
107:            private Location firstLocation;
108:
109:            /**
110:             * Add a group of daemon threads
111:             * @param daemonTasks The tasks to be executed as daemon.
112:             */
113:            public void addDaemons(TaskList daemonTasks) {
114:                if (this .daemonTasks != null) {
115:                    throw new BuildException(
116:                            "Only one daemon group is supported");
117:                }
118:                this .daemonTasks = daemonTasks;
119:            }
120:
121:            /**
122:             * Interval to poll for completed threads when threadCount or
123:             * threadsPerProcessor is specified.  Integer in milliseconds.; optional
124:             *
125:             * @param pollInterval New value of property pollInterval.
126:             */
127:            public void setPollInterval(int pollInterval) {
128:            }
129:
130:            /**
131:             * Control whether a failure in a nested task halts execution. Note that
132:             * the task will complete but existing threads will continue to run - they
133:             * are not stopped
134:             *
135:             * @param failOnAny if true any nested task failure causes parallel to
136:             *        complete.
137:             */
138:            public void setFailOnAny(boolean failOnAny) {
139:                this .failOnAny = failOnAny;
140:            }
141:
142:            /**
143:             * Add a nested task to execute in parallel.
144:             * @param nestedTask  Nested task to be executed in parallel
145:             */
146:            public void addTask(Task nestedTask) {
147:                nestedTasks.addElement(nestedTask);
148:            }
149:
150:            /**
151:             * Dynamically generates the number of threads to execute based on the
152:             * number of available processors (via
153:             * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
154:             * 1.4 VM, and it will overwrite the value set in threadCount.
155:             * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
156:             * <code>threadCount</code>.; optional
157:             * @param numThreadsPerProcessor Number of threads to create per available
158:             *        processor.
159:             *
160:             */
161:            public void setThreadsPerProcessor(int numThreadsPerProcessor) {
162:                this .numThreadsPerProcessor = numThreadsPerProcessor;
163:            }
164:
165:            /**
166:             * Statically determine the maximum number of tasks to execute
167:             * simultaneously.  If there are less tasks than threads then all will be
168:             * executed at once, if there are more then only <code>threadCount</code>
169:             * tasks will be executed at one time.  If <code>threadsPerProcessor</code>
170:             * is set and the JVM is at least a 1.4 VM then this value is
171:             * ignored.; optional
172:             *
173:             * @param numThreads total number of threads.
174:             *
175:             */
176:            public void setThreadCount(int numThreads) {
177:                this .numThreads = numThreads;
178:            }
179:
180:            /**
181:             * Sets the timeout on this set of tasks. If the timeout is reached
182:             * before the other threads complete, the execution of this
183:             * task completes with an exception.
184:             *
185:             * Note that existing threads continue to run.
186:             *
187:             * @param timeout timeout in milliseconds.
188:             */
189:            public void setTimeout(long timeout) {
190:                this .timeout = timeout;
191:            }
192:
193:            /**
194:             * Execute the parallel tasks
195:             *
196:             * @exception BuildException if any of the threads failed.
197:             */
198:            public void execute() throws BuildException {
199:                updateThreadCounts();
200:                if (numThreads == 0) {
201:                    numThreads = nestedTasks.size();
202:                }
203:                spinThreads();
204:            }
205:
206:            /**
207:             * Determine the number of threads based on the number of processors
208:             */
209:            private void updateThreadCounts() {
210:                if (numThreadsPerProcessor != 0) {
211:                    int numProcessors = getNumProcessors();
212:                    if (numProcessors != 0) {
213:                        numThreads = numProcessors * numThreadsPerProcessor;
214:                    }
215:                }
216:            }
217:
218:            private void processExceptions(TaskRunnable[] runnables) {
219:                if (runnables == null) {
220:                    return;
221:                }
222:                for (int i = 0; i < runnables.length; ++i) {
223:                    Throwable t = runnables[i].getException();
224:                    if (t != null) {
225:                        numExceptions++;
226:                        if (firstException == null) {
227:                            firstException = t;
228:                        }
229:                        if (t instanceof  BuildException
230:                                && firstLocation == Location.UNKNOWN_LOCATION) {
231:                            firstLocation = ((BuildException) t).getLocation();
232:                        }
233:                        exceptionMessage.append(StringUtils.LINE_SEP);
234:                        exceptionMessage.append(t.getMessage());
235:                    }
236:                }
237:            }
238:
239:            /**
240:             * Spin up required threads with a maximum number active at any given time.
241:             *
242:             * @exception BuildException if any of the threads failed.
243:             */
244:            private void spinThreads() throws BuildException {
245:                final int numTasks = nestedTasks.size();
246:                TaskRunnable[] runnables = new TaskRunnable[numTasks];
247:                stillRunning = true;
248:                timedOut = false;
249:
250:                int threadNumber = 0;
251:                for (Enumeration e = nestedTasks.elements(); e
252:                        .hasMoreElements(); threadNumber++) {
253:                    Task nestedTask = (Task) e.nextElement();
254:                    runnables[threadNumber] = new TaskRunnable(nestedTask);
255:                }
256:
257:                final int maxRunning = numTasks < numThreads ? numTasks
258:                        : numThreads;
259:                TaskRunnable[] running = new TaskRunnable[maxRunning];
260:
261:                threadNumber = 0;
262:                ThreadGroup group = new ThreadGroup("parallel");
263:
264:                TaskRunnable[] daemons = null;
265:                if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
266:                    daemons = new TaskRunnable[daemonTasks.tasks.size()];
267:                }
268:
269:                synchronized (semaphore) {
270:                    // When we leave this block we can be sure all data is really
271:                    // stored in main memory before the new threads start, the new
272:                    // threads will for sure load the data from main memory.
273:                    //
274:                    // This probably is slightly paranoid.
275:                }
276:
277:                synchronized (semaphore) {
278:                    // start any daemon threads
279:                    if (daemons != null) {
280:                        for (int i = 0; i < daemons.length; ++i) {
281:                            daemons[i] = new TaskRunnable(
282:                                    (Task) daemonTasks.tasks.get(i));
283:                            Thread daemonThread = new Thread(group, daemons[i]);
284:                            daemonThread.setDaemon(true);
285:                            daemonThread.start();
286:                        }
287:                    }
288:
289:                    // now run main threads in limited numbers...
290:                    // start initial batch of threads
291:                    for (int i = 0; i < maxRunning; ++i) {
292:                        running[i] = runnables[threadNumber++];
293:                        Thread thread = new Thread(group, running[i]);
294:                        thread.start();
295:                    }
296:
297:                    if (timeout != 0) {
298:                        // start the timeout thread
299:                        Thread timeoutThread = new Thread() {
300:                            public synchronized void run() {
301:                                try {
302:                                    wait(timeout);
303:                                    synchronized (semaphore) {
304:                                        stillRunning = false;
305:                                        timedOut = true;
306:                                        semaphore.notifyAll();
307:                                    }
308:                                } catch (InterruptedException e) {
309:                                    // ignore
310:                                }
311:                            }
312:                        };
313:                        timeoutThread.start();
314:                    }
315:
316:                    // now find available running slots for the remaining threads
317:                    outer: while (threadNumber < numTasks && stillRunning) {
318:                        for (int i = 0; i < maxRunning; i++) {
319:                            if (running[i] == null || running[i].isFinished()) {
320:                                running[i] = runnables[threadNumber++];
321:                                Thread thread = new Thread(group, running[i]);
322:                                thread.start();
323:                                // continue on outer while loop to get another
324:                                // available slot
325:                                continue outer;
326:                            }
327:                        }
328:
329:                        // if we got here all slots in use, so sleep until
330:                        // something happens
331:                        try {
332:                            semaphore.wait();
333:                        } catch (InterruptedException ie) {
334:                            // doesn't java know interruptions are rude?
335:                            // just pretend it didn't happen and go about out business.
336:                            // sheesh!
337:                        }
338:                    }
339:
340:                    // are all threads finished
341:                    outer2: while (stillRunning) {
342:                        for (int i = 0; i < maxRunning; ++i) {
343:                            if (running[i] != null && !running[i].isFinished()) {
344:                                //System.out.println("Thread " + i + " is still alive ");
345:                                // still running - wait for it
346:                                try {
347:                                    semaphore.wait();
348:                                } catch (InterruptedException ie) {
349:                                    // who would interrupt me at a time like this?
350:                                }
351:                                continue outer2;
352:                            }
353:                        }
354:                        stillRunning = false;
355:                    }
356:                }
357:
358:                if (timedOut) {
359:                    throw new BuildException("Parallel execution timed out");
360:                }
361:
362:                // now did any of the threads throw an exception
363:                exceptionMessage = new StringBuffer();
364:                numExceptions = 0;
365:                firstException = null;
366:                firstLocation = Location.UNKNOWN_LOCATION;
367:                processExceptions(daemons);
368:                processExceptions(runnables);
369:
370:                if (numExceptions == 1) {
371:                    if (firstException instanceof  BuildException) {
372:                        throw (BuildException) firstException;
373:                    } else {
374:                        throw new BuildException(firstException);
375:                    }
376:                } else if (numExceptions > 1) {
377:                    throw new BuildException(exceptionMessage.toString(),
378:                            firstLocation);
379:                }
380:            }
381:
382:            /**
383:             * Determine the number of processors. Only effective on later VMs
384:             *
385:             * @return the number of processors available or 0 if not determinable.
386:             */
387:            private int getNumProcessors() {
388:                try {
389:                    Class[] paramTypes = {};
390:                    Method availableProcessors = Runtime.class.getMethod(
391:                            "availableProcessors", paramTypes);
392:
393:                    Object[] args = {};
394:                    Integer ret = (Integer) availableProcessors.invoke(Runtime
395:                            .getRuntime(), args);
396:                    return ret.intValue();
397:                } catch (Exception e) {
398:                    // return a bogus number
399:                    return 0;
400:                }
401:            }
402:
403:            /**
404:             * thread that execs a task
405:             */
406:            private class TaskRunnable implements  Runnable {
407:                private Throwable exception;
408:                private Task task;
409:                private boolean finished;
410:
411:                /**
412:                 * Construct a new TaskRunnable.<p>
413:                 *
414:                 * @param task the Task to be executed in a separate thread
415:                 */
416:                TaskRunnable(Task task) {
417:                    this .task = task;
418:                }
419:
420:                /**
421:                 * Executes the task within a thread and takes care about
422:                 * Exceptions raised within the task.
423:                 */
424:                public void run() {
425:                    try {
426:                        task.perform();
427:                    } catch (Throwable t) {
428:                        exception = t;
429:                        if (failOnAny) {
430:                            stillRunning = false;
431:                        }
432:                    } finally {
433:                        synchronized (semaphore) {
434:                            finished = true;
435:                            semaphore.notifyAll();
436:                        }
437:                    }
438:                }
439:
440:                /**
441:                 * get any exception that got thrown during execution;
442:                 * @return an exception or null for no exception/not yet finished
443:                 */
444:                public Throwable getException() {
445:                    return exception;
446:                }
447:
448:                /**
449:                 * Provides the indicator that the task has been finished.
450:                 * @return Returns true when the task is finished.
451:                 */
452:                boolean isFinished() {
453:                    return finished;
454:                }
455:            }
456:
457:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.