001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: *
017: */
018: package org.apache.tools.ant.taskdefs;
019:
020: import java.lang.reflect.Method;
021: import java.util.Enumeration;
022: import java.util.Vector;
023: import java.util.List;
024: import java.util.ArrayList;
025: import org.apache.tools.ant.BuildException;
026: import org.apache.tools.ant.Location;
027: import org.apache.tools.ant.Task;
028: import org.apache.tools.ant.TaskContainer;
029: import org.apache.tools.ant.util.StringUtils;
030:
031: /**
032: * Executes the contained tasks in separate threads, continuing
033: * once all are completed.
034: * <p>
035: * New behavior allows for the ant script to specify a maximum number of
036: * threads that will be executed in parallel. One should be very careful about
037: * using the <code>waitFor</code> task when specifying <code>threadCount</code>
038: * as it can cause deadlocks if the number of threads is too small or if one of
039: * the nested tasks fails to execute completely. The task selection algorithm
040: * will insure that the tasks listed before a task have started before that
041: * task is started, but it will not insure a successful completion of those
042: * tasks or that those tasks will finish first (i.e. it's a classic race
043: * condition).
044: * </p>
045: * @since Ant 1.4
046: *
047: * @ant.task category="control"
048: */
049: public class Parallel extends Task implements TaskContainer {
050:
051: /** Class which holds a list of tasks to execute */
052: public static class TaskList implements TaskContainer {
053: /** Collection holding the nested tasks */
054: private List tasks = new ArrayList();
055:
056: /**
057: * Add a nested task to execute parallel (asynchron).
058: * <p>
059: * @param nestedTask Nested task to be executed in parallel.
060: * must not be null.
061: */
062: public void addTask(Task nestedTask) {
063: tasks.add(nestedTask);
064: }
065: }
066:
067: /** Collection holding the nested tasks */
068: private Vector nestedTasks = new Vector();
069:
070: /** Semaphore to notify of completed threads */
071: private final Object semaphore = new Object();
072:
073: /** Total number of threads to run */
074: private int numThreads = 0;
075:
076: /** Total number of threads per processor to run. */
077: private int numThreadsPerProcessor = 0;
078:
079: /** The timeout period in milliseconds */
080: private long timeout;
081:
082: /** Indicates threads are still running and new threads can be issued */
083: private volatile boolean stillRunning;
084:
085: /** Indicates that the execution timedout */
086: private boolean timedOut;
087:
088: /**
089: * Indicates whether failure of any of the nested tasks should end
090: * execution
091: */
092: private boolean failOnAny;
093:
094: /** The dameon task list if any */
095: private TaskList daemonTasks;
096:
097: /** Accumulation of exceptions messages from all nested tasks */
098: private StringBuffer exceptionMessage;
099:
100: /** Number of exceptions from nested tasks */
101: private int numExceptions = 0;
102:
103: /** The first exception encountered */
104: private Throwable firstException;
105:
106: /** The location of the first exception */
107: private Location firstLocation;
108:
109: /**
110: * Add a group of daemon threads
111: * @param daemonTasks The tasks to be executed as daemon.
112: */
113: public void addDaemons(TaskList daemonTasks) {
114: if (this .daemonTasks != null) {
115: throw new BuildException(
116: "Only one daemon group is supported");
117: }
118: this .daemonTasks = daemonTasks;
119: }
120:
121: /**
122: * Interval to poll for completed threads when threadCount or
123: * threadsPerProcessor is specified. Integer in milliseconds.; optional
124: *
125: * @param pollInterval New value of property pollInterval.
126: */
127: public void setPollInterval(int pollInterval) {
128: }
129:
130: /**
131: * Control whether a failure in a nested task halts execution. Note that
132: * the task will complete but existing threads will continue to run - they
133: * are not stopped
134: *
135: * @param failOnAny if true any nested task failure causes parallel to
136: * complete.
137: */
138: public void setFailOnAny(boolean failOnAny) {
139: this .failOnAny = failOnAny;
140: }
141:
142: /**
143: * Add a nested task to execute in parallel.
144: * @param nestedTask Nested task to be executed in parallel
145: */
146: public void addTask(Task nestedTask) {
147: nestedTasks.addElement(nestedTask);
148: }
149:
150: /**
151: * Dynamically generates the number of threads to execute based on the
152: * number of available processors (via
153: * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
154: * 1.4 VM, and it will overwrite the value set in threadCount.
155: * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
156: * <code>threadCount</code>.; optional
157: * @param numThreadsPerProcessor Number of threads to create per available
158: * processor.
159: *
160: */
161: public void setThreadsPerProcessor(int numThreadsPerProcessor) {
162: this .numThreadsPerProcessor = numThreadsPerProcessor;
163: }
164:
165: /**
166: * Statically determine the maximum number of tasks to execute
167: * simultaneously. If there are less tasks than threads then all will be
168: * executed at once, if there are more then only <code>threadCount</code>
169: * tasks will be executed at one time. If <code>threadsPerProcessor</code>
170: * is set and the JVM is at least a 1.4 VM then this value is
171: * ignored.; optional
172: *
173: * @param numThreads total number of threads.
174: *
175: */
176: public void setThreadCount(int numThreads) {
177: this .numThreads = numThreads;
178: }
179:
180: /**
181: * Sets the timeout on this set of tasks. If the timeout is reached
182: * before the other threads complete, the execution of this
183: * task completes with an exception.
184: *
185: * Note that existing threads continue to run.
186: *
187: * @param timeout timeout in milliseconds.
188: */
189: public void setTimeout(long timeout) {
190: this .timeout = timeout;
191: }
192:
193: /**
194: * Execute the parallel tasks
195: *
196: * @exception BuildException if any of the threads failed.
197: */
198: public void execute() throws BuildException {
199: updateThreadCounts();
200: if (numThreads == 0) {
201: numThreads = nestedTasks.size();
202: }
203: spinThreads();
204: }
205:
206: /**
207: * Determine the number of threads based on the number of processors
208: */
209: private void updateThreadCounts() {
210: if (numThreadsPerProcessor != 0) {
211: int numProcessors = getNumProcessors();
212: if (numProcessors != 0) {
213: numThreads = numProcessors * numThreadsPerProcessor;
214: }
215: }
216: }
217:
218: private void processExceptions(TaskRunnable[] runnables) {
219: if (runnables == null) {
220: return;
221: }
222: for (int i = 0; i < runnables.length; ++i) {
223: Throwable t = runnables[i].getException();
224: if (t != null) {
225: numExceptions++;
226: if (firstException == null) {
227: firstException = t;
228: }
229: if (t instanceof BuildException
230: && firstLocation == Location.UNKNOWN_LOCATION) {
231: firstLocation = ((BuildException) t).getLocation();
232: }
233: exceptionMessage.append(StringUtils.LINE_SEP);
234: exceptionMessage.append(t.getMessage());
235: }
236: }
237: }
238:
239: /**
240: * Spin up required threads with a maximum number active at any given time.
241: *
242: * @exception BuildException if any of the threads failed.
243: */
244: private void spinThreads() throws BuildException {
245: final int numTasks = nestedTasks.size();
246: TaskRunnable[] runnables = new TaskRunnable[numTasks];
247: stillRunning = true;
248: timedOut = false;
249:
250: int threadNumber = 0;
251: for (Enumeration e = nestedTasks.elements(); e
252: .hasMoreElements(); threadNumber++) {
253: Task nestedTask = (Task) e.nextElement();
254: runnables[threadNumber] = new TaskRunnable(nestedTask);
255: }
256:
257: final int maxRunning = numTasks < numThreads ? numTasks
258: : numThreads;
259: TaskRunnable[] running = new TaskRunnable[maxRunning];
260:
261: threadNumber = 0;
262: ThreadGroup group = new ThreadGroup("parallel");
263:
264: TaskRunnable[] daemons = null;
265: if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
266: daemons = new TaskRunnable[daemonTasks.tasks.size()];
267: }
268:
269: synchronized (semaphore) {
270: // When we leave this block we can be sure all data is really
271: // stored in main memory before the new threads start, the new
272: // threads will for sure load the data from main memory.
273: //
274: // This probably is slightly paranoid.
275: }
276:
277: synchronized (semaphore) {
278: // start any daemon threads
279: if (daemons != null) {
280: for (int i = 0; i < daemons.length; ++i) {
281: daemons[i] = new TaskRunnable(
282: (Task) daemonTasks.tasks.get(i));
283: Thread daemonThread = new Thread(group, daemons[i]);
284: daemonThread.setDaemon(true);
285: daemonThread.start();
286: }
287: }
288:
289: // now run main threads in limited numbers...
290: // start initial batch of threads
291: for (int i = 0; i < maxRunning; ++i) {
292: running[i] = runnables[threadNumber++];
293: Thread thread = new Thread(group, running[i]);
294: thread.start();
295: }
296:
297: if (timeout != 0) {
298: // start the timeout thread
299: Thread timeoutThread = new Thread() {
300: public synchronized void run() {
301: try {
302: wait(timeout);
303: synchronized (semaphore) {
304: stillRunning = false;
305: timedOut = true;
306: semaphore.notifyAll();
307: }
308: } catch (InterruptedException e) {
309: // ignore
310: }
311: }
312: };
313: timeoutThread.start();
314: }
315:
316: // now find available running slots for the remaining threads
317: outer: while (threadNumber < numTasks && stillRunning) {
318: for (int i = 0; i < maxRunning; i++) {
319: if (running[i] == null || running[i].isFinished()) {
320: running[i] = runnables[threadNumber++];
321: Thread thread = new Thread(group, running[i]);
322: thread.start();
323: // continue on outer while loop to get another
324: // available slot
325: continue outer;
326: }
327: }
328:
329: // if we got here all slots in use, so sleep until
330: // something happens
331: try {
332: semaphore.wait();
333: } catch (InterruptedException ie) {
334: // doesn't java know interruptions are rude?
335: // just pretend it didn't happen and go about out business.
336: // sheesh!
337: }
338: }
339:
340: // are all threads finished
341: outer2: while (stillRunning) {
342: for (int i = 0; i < maxRunning; ++i) {
343: if (running[i] != null && !running[i].isFinished()) {
344: //System.out.println("Thread " + i + " is still alive ");
345: // still running - wait for it
346: try {
347: semaphore.wait();
348: } catch (InterruptedException ie) {
349: // who would interrupt me at a time like this?
350: }
351: continue outer2;
352: }
353: }
354: stillRunning = false;
355: }
356: }
357:
358: if (timedOut) {
359: throw new BuildException("Parallel execution timed out");
360: }
361:
362: // now did any of the threads throw an exception
363: exceptionMessage = new StringBuffer();
364: numExceptions = 0;
365: firstException = null;
366: firstLocation = Location.UNKNOWN_LOCATION;
367: processExceptions(daemons);
368: processExceptions(runnables);
369:
370: if (numExceptions == 1) {
371: if (firstException instanceof BuildException) {
372: throw (BuildException) firstException;
373: } else {
374: throw new BuildException(firstException);
375: }
376: } else if (numExceptions > 1) {
377: throw new BuildException(exceptionMessage.toString(),
378: firstLocation);
379: }
380: }
381:
382: /**
383: * Determine the number of processors. Only effective on later VMs
384: *
385: * @return the number of processors available or 0 if not determinable.
386: */
387: private int getNumProcessors() {
388: try {
389: Class[] paramTypes = {};
390: Method availableProcessors = Runtime.class.getMethod(
391: "availableProcessors", paramTypes);
392:
393: Object[] args = {};
394: Integer ret = (Integer) availableProcessors.invoke(Runtime
395: .getRuntime(), args);
396: return ret.intValue();
397: } catch (Exception e) {
398: // return a bogus number
399: return 0;
400: }
401: }
402:
403: /**
404: * thread that execs a task
405: */
406: private class TaskRunnable implements Runnable {
407: private Throwable exception;
408: private Task task;
409: private boolean finished;
410:
411: /**
412: * Construct a new TaskRunnable.<p>
413: *
414: * @param task the Task to be executed in a separate thread
415: */
416: TaskRunnable(Task task) {
417: this .task = task;
418: }
419:
420: /**
421: * Executes the task within a thread and takes care about
422: * Exceptions raised within the task.
423: */
424: public void run() {
425: try {
426: task.perform();
427: } catch (Throwable t) {
428: exception = t;
429: if (failOnAny) {
430: stillRunning = false;
431: }
432: } finally {
433: synchronized (semaphore) {
434: finished = true;
435: semaphore.notifyAll();
436: }
437: }
438: }
439:
440: /**
441: * get any exception that got thrown during execution;
442: * @return an exception or null for no exception/not yet finished
443: */
444: public Throwable getException() {
445: return exception;
446: }
447:
448: /**
449: * Provides the indicator that the task has been finished.
450: * @return Returns true when the task is finished.
451: */
452: boolean isFinished() {
453: return finished;
454: }
455: }
456:
457: }
|