001: /*
002: * (c) Copyright 2007 by Volker Bergmann. All rights reserved.
003: *
004: * Redistribution and use in source and binary forms, with or without
005: * modification, is permitted under the terms of the
006: * GNU General Public License.
007: *
008: * For redistributing this software or a derivative work under a license other
009: * than the GPL-compatible Free Software License as defined by the Free
010: * Software Foundation or approved by OSI, you must first obtain a commercial
011: * license to this software product from Volker Bergmann.
012: *
013: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
014: * WITHOUT A WARRANTY OF ANY KIND. ALL EXPRESS OR IMPLIED CONDITIONS,
015: * REPRESENTATIONS AND WARRANTIES, INCLUDING ANY IMPLIED WARRANTY OF
016: * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT, ARE
017: * HEREBY EXCLUDED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
018: * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
019: * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
020: * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
021: * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
022: * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
023: * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
024: * POSSIBILITY OF SUCH DAMAGE.
025: */
026:
027: package org.databene.task;
028:
029: import org.apache.commons.logging.Log;
030: import org.apache.commons.logging.LogFactory;
031: import org.databene.commons.ConfigurationError;
032:
033: import java.util.concurrent.CountDownLatch;
034: import java.util.concurrent.ExecutorService;
035: import java.util.concurrent.Executors;
036: import java.lang.reflect.Method;
037: import java.lang.reflect.InvocationTargetException;
038:
039: /**
040: * Task implementation that provides for repeated, paged, multithreaded Task execution.<br/>
041: * <br/>
042: * Created: 16.07.2007 19:25:30
043: * @author Volker Bergmann
044: */
045: public class PagedTask extends AbstractTask implements
046: Thread.UncaughtExceptionHandler {
047:
048: private static final Log logger = LogFactory
049: .getLog(PagedTask.class);
050:
051: protected Task realTask;
052: private PageListener listener;
053:
054: private long totalInvocations;
055: private long pageSize;
056: private int threadCount;
057:
058: private ExecutorService executor;
059:
060: private Throwable exception;
061:
062: // constructors ----------------------------------------------------------------------------------------------------
063:
064: public PagedTask() {
065: this (null);
066: }
067:
068: public PagedTask(Task realTask) {
069: this (realTask, 1);
070: }
071:
072: public PagedTask(Task realTask, long totalInvocations) {
073: this (realTask, totalInvocations, null, 1);
074: }
075:
076: public PagedTask(Task realTask, long totalInvocations,
077: PageListener listener, long pageSize) {
078: this (realTask, totalInvocations, listener, pageSize, 1,
079: Executors.newSingleThreadExecutor());
080: }
081:
082: public PagedTask(Task realTask, long totalInvocations,
083: PageListener listener, long pageSize, int threads,
084: ExecutorService executor) {
085: this .realTask = realTask;
086: this .listener = listener;
087: this .totalInvocations = totalInvocations;
088: this .pageSize = pageSize;
089: this .threadCount = threads;
090: this .executor = executor;
091: }
092:
093: // Task implementation ---------------------------------------------------------------------------------------------
094:
095: /**
096: * @return the totalInvocations
097: */
098: public long getTotalInvocations() {
099: return totalInvocations;
100: }
101:
102: /**
103: * @return the pageSize
104: */
105: public long getPageSize() {
106: return pageSize;
107: }
108:
109: /**
110: * @return the threadCount
111: */
112: public int getThreadCount() {
113: return threadCount;
114: }
115:
116: public void run() {
117: if (totalInvocations == 0)
118: return;
119: this .exception = null;
120: int invocationCount = 0;
121: if (logger.isDebugEnabled())
122: logger.debug("Running PagedTask[" + getTaskName() + "]");
123: int currentPageNo = 0;
124: while (workPending(currentPageNo)) {
125: pageStarting(currentPageNo);
126: long currentPageSize = (totalInvocations < 0 ? pageSize
127: : Math.min(pageSize, totalInvocations
128: - invocationCount));
129: if (threadCount > 1)
130: invocationCount += runMultiThreaded(currentPageNo,
131: currentPageSize);
132: else
133: invocationCount += runSingleThreaded(currentPageSize);
134: pageFinished(currentPageNo);
135: if (exception != null)
136: throw new RuntimeException(exception);
137: currentPageNo++;
138: }
139: if (logger.isDebugEnabled())
140: logger.debug("PagedTask " + getTaskName() + " finished");
141: }
142:
143: private long runMultiThreaded(int currentPageNo,
144: long currentPageSize) {
145: long localInvocationCount = 0;
146: int maxLoopsPerPage = (int) ((currentPageSize + threadCount - 1) / threadCount);
147: int shorterLoops = (int) (threadCount * maxLoopsPerPage - currentPageSize);
148: if (realTask instanceof ThreadSafe)
149: realTask.init(context);
150: // create threads for a page
151: CountDownLatch latch = new CountDownLatch(threadCount);
152: for (int threadNo = 0; threadNo < threadCount; threadNo++) {
153: int loopSize = maxLoopsPerPage;
154: if (totalInvocations >= 0
155: && threadNo >= threadCount - shorterLoops)
156: loopSize--;
157: if (loopSize > 0) {
158: Task task = realTask;
159: if (threadCount > 1) {
160: if (!(task instanceof ThreadSafe)) {
161: if (task instanceof Parallelizable)
162: task = cloneTask((Parallelizable) task);
163: else
164: throw new ConfigurationError(
165: "Since the task is not marked as thread-safe,"
166: + "it must either be used single-threaded "
167: + "or implement the Parallelizable interface");
168: }
169: }
170: task = new LoopedTask(task, loopSize); // TODO v0.4.2 leave the loop if generator has become unavailable
171: TaskRunnable thread = new TaskRunnable(task,
172: (realTask instanceof ThreadSafe ? null
173: : context), latch);
174: executor.execute(thread);
175: localInvocationCount += loopSize;
176: } else
177: latch.countDown();
178: }
179:
180: if (logger.isDebugEnabled())
181: logger.debug("Waiting for end of page "
182: + (currentPageNo + 1) + " of " + getTaskName()
183: + "...");
184: try {
185: latch.await();
186: } catch (InterruptedException e) {
187: e.printStackTrace();
188: }
189: if (realTask instanceof ThreadSafe)
190: realTask.destroy();
191: return localInvocationCount;
192: }
193:
194: private long runSingleThreaded(long currentPageSize) {
195: Task task = new LoopedTask(realTask, currentPageSize);
196: task.init(context);
197: task.run();
198: task.destroy();
199: return currentPageSize;
200: }
201:
202: protected boolean workPending(int currentPageNo) {
203: if (!realTask.wantsToRun())
204: return false;
205: long pages = (totalInvocations >= 0 ? (totalInvocations
206: + pageSize - 1)
207: / pageSize : -1);
208: return pages < 0 || currentPageNo < pages;
209: }
210:
211: private Task cloneTask(Parallelizable task) {
212: try {
213: Method cloneMethod = task.getClass().getMethod("clone");
214: return (Task) cloneMethod.invoke(task);
215: } catch (NoSuchMethodException e) {
216: throw new RuntimeException("Unexpected exception", e); // This is not supposed to happen
217: } catch (IllegalAccessException e) {
218: throw new RuntimeException("Unexpected exception", e); // This is not supposed to happen
219: } catch (InvocationTargetException e) {
220: throw new RuntimeException(
221: "Execption occured in clone() method", e);
222: }
223: }
224:
225: protected void pageStarting(int currentPageNo) {
226: if (logger.isDebugEnabled())
227: logger.debug("Starting page " + (currentPageNo + 1)
228: + " of " + getTaskName() + " with pagesize="
229: + pageSize);
230: if (listener != null)
231: listener.pageStarting(currentPageNo, -1);
232: }
233:
234: protected void pageFinished(int currentPageNo) {
235: if (logger.isDebugEnabled())
236: logger.debug("Page " + (currentPageNo + 1) + " of "
237: + getTaskName() + " finished");
238: if (listener != null)
239: listener.pageFinished(currentPageNo, -1);
240: }
241:
242: public void uncaughtException(Thread t, Throwable e) {
243: this.exception = e;
244: }
245: }
|