001: package org.apache.lucene.benchmark.byTask.tasks;
002:
003: /**
004: * Licensed to the Apache Software Foundation (ASF) under one or more
005: * contributor license agreements. See the NOTICE file distributed with
006: * this work for additional information regarding copyright ownership.
007: * The ASF licenses this file to You under the Apache License, Version 2.0
008: * (the "License"); you may not use this file except in compliance with
009: * the License. You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: */
019:
020: import java.util.ArrayList;
021: import java.util.Iterator;
022:
023: import org.apache.lucene.benchmark.byTask.PerfRunData;
024: import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
025:
026: /**
027: * Sequence of parallel or sequential tasks.
028: */
029: public class TaskSequence extends PerfTask {
030: public static int REPEAT_EXHAUST = -2;
031: private ArrayList tasks;
032: private int repetitions = 1;
033: private boolean parallel;
034: private TaskSequence parent;
035: private boolean letChildReport = true;
036: private int rate = 0;
037: private boolean perMin = false; // rate, if set, is, by default, be sec.
038: private String seqName;
039: private boolean exhausted = false;
040: private boolean resetExhausted = false;
041: private PerfTask[] tasksArray;
042: private boolean anyExhaustibleTasks;
043:
044: public TaskSequence(PerfRunData runData, String name,
045: TaskSequence parent, boolean parallel) {
046: super (runData);
047: name = (name != null ? name : (parallel ? "Par" : "Seq"));
048: setName(name);
049: setSequenceName();
050: this .parent = parent;
051: this .parallel = parallel;
052: tasks = new ArrayList();
053: }
054:
055: private void initTasksArray() {
056: if (tasksArray == null) {
057: final int numTasks = tasks.size();
058: tasksArray = new PerfTask[numTasks];
059: for (int k = 0; k < numTasks; k++) {
060: tasksArray[k] = (PerfTask) tasks.get(k);
061: anyExhaustibleTasks |= tasksArray[k] instanceof ResetInputsTask;
062: anyExhaustibleTasks |= tasksArray[k] instanceof TaskSequence;
063: }
064: }
065: }
066:
067: /**
068: * @return Returns the parallel.
069: */
070: public boolean isParallel() {
071: return parallel;
072: }
073:
074: /**
075: * @return Returns the repetitions.
076: */
077: public int getRepetitions() {
078: return repetitions;
079: }
080:
081: /**
082: * @param repetitions The repetitions to set.
083: * @throws Exception
084: */
085: public void setRepetitions(int repetitions) throws Exception {
086: this .repetitions = repetitions;
087: if (repetitions == REPEAT_EXHAUST) {
088: if (isParallel()) {
089: throw new Exception(
090: "REPEAT_EXHAUST is not allowed for parallel tasks");
091: }
092: if (getRunData().getConfig().get("doc.maker.forever", true)) {
093: throw new Exception(
094: "REPEAT_EXHAUST requires setting doc.maker.forever=false");
095: }
096: }
097: setSequenceName();
098: }
099:
100: /**
101: * @return Returns the parent.
102: */
103: public TaskSequence getParent() {
104: return parent;
105: }
106:
107: /*
108: * (non-Javadoc)
109: * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#doLogic()
110: */
111: public int doLogic() throws Exception {
112: exhausted = resetExhausted = false;
113: return (parallel ? doParallelTasks() : doSerialTasks());
114: }
115:
116: private int doSerialTasks() throws Exception {
117: if (rate > 0) {
118: return doSerialTasksWithRate();
119: }
120:
121: initTasksArray();
122: int count = 0;
123:
124: for (int k = 0; (repetitions == REPEAT_EXHAUST && !exhausted)
125: || k < repetitions; k++) {
126: for (int l = 0; l < tasksArray.length; l++)
127: try {
128: final PerfTask task = tasksArray[l];
129: count += task.runAndMaybeStats(letChildReport);
130: if (anyExhaustibleTasks)
131: updateExhausted(task);
132: } catch (NoMoreDataException e) {
133: exhausted = true;
134: }
135: }
136: return count;
137: }
138:
139: private int doSerialTasksWithRate() throws Exception {
140: initTasksArray();
141: long delayStep = (perMin ? 60000 : 1000) / rate;
142: long nextStartTime = System.currentTimeMillis();
143: int count = 0;
144: for (int k = 0; (repetitions == REPEAT_EXHAUST && !exhausted)
145: || k < repetitions; k++) {
146: for (int l = 0; l < tasksArray.length; l++) {
147: final PerfTask task = tasksArray[l];
148: long waitMore = nextStartTime
149: - System.currentTimeMillis();
150: if (waitMore > 0) {
151: //System.out.println("wait: "+waitMore+" for rate: "+ratePerMin+" (delayStep="+delayStep+")");
152: Thread.sleep(waitMore);
153: }
154: nextStartTime += delayStep; // this aims at avarage rate.
155: try {
156: count += task.runAndMaybeStats(letChildReport);
157: if (anyExhaustibleTasks)
158: updateExhausted(task);
159: } catch (NoMoreDataException e) {
160: exhausted = true;
161: }
162: }
163: }
164: return count;
165: }
166:
167: // update state regarding exhaustion.
168: private void updateExhausted(PerfTask task) {
169: if (task instanceof ResetInputsTask) {
170: exhausted = false;
171: resetExhausted = true;
172: } else if (task instanceof TaskSequence) {
173: TaskSequence t = (TaskSequence) task;
174: if (t.resetExhausted) {
175: exhausted = false;
176: resetExhausted = true;
177: t.resetExhausted = false;
178: } else {
179: exhausted |= t.exhausted;
180: }
181: }
182: }
183:
184: private int doParallelTasks() throws Exception {
185: initTasksArray();
186: final int count[] = { 0 };
187: Thread t[] = new Thread[repetitions * tasks.size()];
188: // prepare threads
189: int indx = 0;
190: for (int k = 0; k < repetitions; k++) {
191: for (int i = 0; i < tasksArray.length; i++) {
192: final PerfTask task = (PerfTask) tasksArray[i].clone();
193: t[indx++] = new Thread() {
194: public void run() {
195: int n;
196: try {
197: n = task.runAndMaybeStats(letChildReport);
198: updateExhausted(task);
199: synchronized (count) {
200: count[0] += n;
201: }
202: } catch (NoMoreDataException e) {
203: exhausted = true;
204: } catch (Exception e) {
205: throw new RuntimeException(e);
206: }
207: }
208: };
209: }
210: }
211: // run threads
212: startThreads(t);
213: // wait for all threads to complete
214: for (int i = 0; i < t.length; i++) {
215: t[i].join();
216: }
217: // return total count
218: return count[0];
219: }
220:
221: // run threads
222: private void startThreads(Thread[] t) throws InterruptedException {
223: if (rate > 0) {
224: startlThreadsWithRate(t);
225: return;
226: }
227: for (int i = 0; i < t.length; i++) {
228: t[i].start();
229: }
230: }
231:
232: // run threadsm with rate
233: private void startlThreadsWithRate(Thread[] t)
234: throws InterruptedException {
235: long delayStep = (perMin ? 60000 : 1000) / rate;
236: long nextStartTime = System.currentTimeMillis();
237: for (int i = 0; i < t.length; i++) {
238: long waitMore = nextStartTime - System.currentTimeMillis();
239: if (waitMore > 0) {
240: //System.out.println("thread wait: "+waitMore+" for rate: "+ratePerMin+" (delayStep="+delayStep+")");
241: Thread.sleep(waitMore);
242: }
243: nextStartTime += delayStep; // this aims at avarage rate of starting threads.
244: t[i].start();
245: }
246: }
247:
248: public void addTask(PerfTask task) {
249: tasks.add(task);
250: task.setDepth(getDepth() + 1);
251: }
252:
253: /* (non-Javadoc)
254: * @see java.lang.Object#toString()
255: */
256: public String toString() {
257: String padd = getPadding();
258: StringBuffer sb = new StringBuffer(super .toString());
259: sb.append(parallel ? " [" : " {");
260: sb.append(NEW_LINE);
261: for (Iterator it = tasks.iterator(); it.hasNext();) {
262: PerfTask task = (PerfTask) it.next();
263: sb.append(task.toString());
264: sb.append(NEW_LINE);
265: }
266: sb.append(padd);
267: sb.append(!letChildReport ? ">" : (parallel ? "]" : "}"));
268: if (repetitions > 1) {
269: sb.append(" * " + repetitions);
270: }
271: if (repetitions == REPEAT_EXHAUST) {
272: sb.append(" * EXHAUST");
273: }
274: if (rate > 0) {
275: sb.append(", rate: " + rate + "/"
276: + (perMin ? "min" : "sec"));
277: }
278: return sb.toString();
279: }
280:
281: /**
282: * Execute child tasks in a way that they do not report their time separately.
283: */
284: public void setNoChildReport() {
285: letChildReport = false;
286: for (Iterator it = tasks.iterator(); it.hasNext();) {
287: PerfTask task = (PerfTask) it.next();
288: if (task instanceof TaskSequence) {
289: ((TaskSequence) task).setNoChildReport();
290: }
291: }
292: }
293:
294: /**
295: * Returns the rate per minute: how many operations should be performed in a minute.
296: * If 0 this has no effect.
297: * @return the rate per min: how many operations should be performed in a minute.
298: */
299: public int getRate() {
300: return (perMin ? rate : 60 * rate);
301: }
302:
303: /**
304: * @param rate The rate to set.
305: */
306: public void setRate(int rate, boolean perMin) {
307: this .rate = rate;
308: this .perMin = perMin;
309: setSequenceName();
310: }
311:
312: private void setSequenceName() {
313: seqName = super .getName();
314: if (repetitions == REPEAT_EXHAUST) {
315: seqName += "_Exhaust";
316: } else if (repetitions > 1) {
317: seqName += "_" + repetitions;
318: }
319: if (rate > 0) {
320: seqName += "_" + rate + (perMin ? "/min" : "/sec");
321: }
322: if (parallel && seqName.toLowerCase().indexOf("par") < 0) {
323: seqName += "_Par";
324: }
325: }
326:
327: public String getName() {
328: return seqName; // overide to include more info
329: }
330:
331: /**
332: * @return Returns the tasks.
333: */
334: public ArrayList getTasks() {
335: return tasks;
336: }
337:
338: /* (non-Javadoc)
339: * @see java.lang.Object#clone()
340: */
341: protected Object clone() throws CloneNotSupportedException {
342: TaskSequence res = (TaskSequence) super .clone();
343: res.tasks = new ArrayList();
344: for (int i = 0; i < tasks.size(); i++) {
345: res.tasks.add(((PerfTask) tasks.get(i)).clone());
346: }
347: return res;
348: }
349:
350: }
|