001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013: * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014: * License for the specific language governing permissions and limitations
015: * under the License.
016: *
017: */
018:
019: package org.apache.jmeter.engine;
020:
021: import java.io.PrintWriter;
022: import java.io.Serializable;
023: import java.io.StringWriter;
024: import java.util.ArrayList;
025: import java.util.Date;
026: import java.util.HashMap;
027: import java.util.HashSet;
028: import java.util.Iterator;
029: import java.util.LinkedList;
030: import java.util.List;
031: import java.util.Map;
032: import java.util.Properties;
033:
034: import org.apache.jmeter.testbeans.TestBean;
035: import org.apache.jmeter.testbeans.TestBeanHelper;
036: import org.apache.jmeter.testelement.TestElement;
037: import org.apache.jmeter.testelement.TestListener;
038: import org.apache.jmeter.testelement.TestPlan;
039: import org.apache.jmeter.threads.JMeterContextService;
040: import org.apache.jmeter.threads.JMeterThread;
041: import org.apache.jmeter.threads.JMeterThreadMonitor;
042: import org.apache.jmeter.threads.ListenerNotifier;
043: import org.apache.jmeter.threads.TestCompiler;
044: import org.apache.jmeter.threads.ThreadGroup;
045: import org.apache.jmeter.util.JMeterUtils;
046: import org.apache.jorphan.collections.HashTree;
047: import org.apache.jorphan.collections.ListedHashTree;
048: import org.apache.jorphan.collections.SearchByClass;
049: import org.apache.jorphan.logging.LoggingManager;
050: import org.apache.log.Logger;
051:
052: /**
053: */
054: public class StandardJMeterEngine implements JMeterEngine,
055: JMeterThreadMonitor, Runnable, Serializable {
056: private static final Logger log = LoggingManager
057: .getLoggerForClass();
058:
059: private static final long serialVersionUID = 231L; // Remember to change this when the class changes ...
060:
061: private transient Thread runningThread;
062:
063: private static long WAIT_TO_DIE = 5 * 1000; // 5 seconds
064:
065: private transient Map allThreads;
066:
067: private volatile boolean startingGroups; // flag to show that groups are still being created
068:
069: private boolean running = false;
070:
071: private boolean serialized = false;
072:
073: private volatile boolean schcdule_run = false;
074:
075: private HashTree test;
076:
077: private transient SearchByClass testListeners;
078:
079: private String host = null;
080:
081: private transient ListenerNotifier notifier;
082:
083: // Should we exit at end of the test? (only applies to server, because host is non-null)
084: private static final boolean exitAfterTest = JMeterUtils
085: .getPropDefault("server.exitaftertest", false); // $NON-NLS-1$
086:
087: private static final boolean startListenersLater = JMeterUtils
088: .getPropDefault("jmeterengine.startlistenerslater", true); // $NON-NLS-1$
089:
090: static {
091: if (startListenersLater) {
092: log
093: .info("Listeners will be started after enabling running version");
094: log
095: .info("To revert to the earlier behaviour, define jmeterengine.startlistenerslater=false");
096: }
097: }
098: // Allow engine and threads to be stopped from outside a thread
099: // e.g. from beanshell server
100: // Assumes that there is only one instance of the engine
101: // at any one time so it is not guaranteed to work ...
102: private static transient Map allThreadNames;
103:
104: private static StandardJMeterEngine engine;
105:
106: private static Map allThreadsSave;
107:
108: public static void stopEngineNow() {
109: if (engine != null) // May be null if called from Unit test
110: engine.stopTest(true);
111: }
112:
113: public static void stopEngine() {
114: if (engine != null) // May be null if called from Unit test
115: engine.stopTest(false);
116: }
117:
118: /*
119: * Allow functions etc to register for testStopped notification
120: */
121: private static final List testList = new ArrayList();
122:
123: public static synchronized void register(TestListener tl) {
124: testList.add(tl);
125: }
126:
127: public static boolean stopThread(String threadName) {
128: return stopThread(threadName, false);
129: }
130:
131: public static boolean stopThreadNow(String threadName) {
132: return stopThread(threadName, true);
133: }
134:
135: private static boolean stopThread(String threadName, boolean now) {
136: if (allThreadNames == null)
137: return false;// e.g. not yet started
138: JMeterThread thrd;
139: try {
140: thrd = (JMeterThread) allThreadNames.get(threadName);
141: } catch (Exception e) {
142: log.warn("stopThread: " + e);
143: return false;
144: }
145: if (thrd != null) {
146: thrd.stop();
147: if (now) {
148: Thread t = (Thread) allThreadsSave.get(thrd);
149: if (t != null) {
150: t.interrupt();
151: }
152: }
153: return true;
154: }
155: return false;
156: }
157:
158: // End of code to allow engine to be controlled remotely
159:
160: public StandardJMeterEngine() {
161: allThreads = new HashMap();
162: engine = this ;
163: allThreadNames = new HashMap();
164: allThreadsSave = allThreads;
165: }
166:
167: public StandardJMeterEngine(String host) {
168: this ();
169: this .host = host;
170: }
171:
172: public void configure(HashTree testTree) {
173: test = testTree;
174: }
175:
176: public void setHost(String host) {
177: this .host = host;
178: }
179:
180: protected HashTree getTestTree() {
181: return test;
182: }
183:
184: protected void compileTree() {
185: PreCompiler compiler = new PreCompiler();
186: getTestTree().traverse(compiler);
187: }
188:
189: // TODO: in Java1.5, perhaps we can use Thread.setUncaughtExceptionHandler() instead
190: private static class MyThreadGroup extends java.lang.ThreadGroup {
191: public MyThreadGroup(String s) {
192: super (s);
193: }
194:
195: public void uncaughtException(Thread t, Throwable e) {
196: if (!(e instanceof ThreadDeath)) {
197: log.error("Uncaught exception: ", e);
198: System.err.println("Uncaught Exception " + e
199: + ". See log file for details.");
200: }
201: }
202:
203: }
204:
205: public void runTest() throws JMeterEngineException {
206: try {
207: runningThread = new Thread(new MyThreadGroup(
208: "JMeterThreadGroup"), this );
209: runningThread.start();
210: } catch (Exception err) {
211: stopTest();
212: StringWriter string = new StringWriter();
213: PrintWriter writer = new PrintWriter(string);
214: err.printStackTrace(writer);
215: throw new JMeterEngineException(string.toString());
216: }
217: }
218:
219: private void removeThreadGroups(List elements) {
220: Iterator iter = elements.iterator();
221: while (iter.hasNext()) {
222: Object item = iter.next();
223: if (item instanceof ThreadGroup) {
224: iter.remove();
225: } else if (!(item instanceof TestElement)) {
226: iter.remove();
227: }
228: }
229: }
230:
231: protected void notifyTestListenersOfStart() {
232: Iterator iter = testListeners.getSearchResults().iterator();
233: while (iter.hasNext()) {
234: TestListener tl = (TestListener) iter.next();
235: if (tl instanceof TestBean)
236: TestBeanHelper.prepare((TestElement) tl);
237: if (host == null) {
238: tl.testStarted();
239: } else {
240: tl.testStarted(host);
241: }
242: }
243: }
244:
245: protected void notifyTestListenersOfEnd() {
246: log.info("Notifying test listeners of end of test");
247: Iterator iter = testListeners.getSearchResults().iterator();
248: while (iter.hasNext()) {
249: TestListener tl = (TestListener) iter.next();
250: if (tl instanceof TestBean)
251: TestBeanHelper.prepare((TestElement) tl);
252: if (host == null) {
253: tl.testEnded();
254: } else {
255: tl.testEnded(host);
256: }
257: }
258: log.info("Test has ended");
259: if (host != null) {
260: long now = System.currentTimeMillis();
261: System.out.println("Finished the test on host " + host
262: + " @ " + new Date(now) + " (" + now + ")");
263: if (exitAfterTest) {
264: exit();
265: }
266: }
267: }
268:
269: private ListedHashTree cloneTree(ListedHashTree tree) {
270: TreeCloner cloner = new TreeCloner(true);
271: tree.traverse(cloner);
272: return cloner.getClonedTree();
273: }
274:
275: public void reset() {
276: if (running) {
277: stopTest();
278: }
279: }
280:
281: public synchronized void threadFinished(JMeterThread thread) {
282: try {
283: allThreads.remove(thread);
284: log.info("Ending thread " + thread.getThreadName());
285: if (!serialized && !schcdule_run && !startingGroups
286: && allThreads.size() == 0) {
287: log.info("Stopping test");
288: stopTest();
289: }
290: } catch (Throwable e) {
291: log
292: .fatalError(
293: "Call to threadFinished should never throw an exception - this can deadlock JMeter",
294: e);
295: }
296: }
297:
298: public synchronized void stopTest() {
299: Thread stopThread = new Thread(new StopTest());
300: stopThread.start();
301: }
302:
303: public synchronized void stopTest(boolean b) {
304: Thread stopThread = new Thread(new StopTest(b));
305: stopThread.start();
306: }
307:
308: private class StopTest implements Runnable {
309: boolean now;
310:
311: private StopTest() {
312: now = true;
313: }
314:
315: private StopTest(boolean b) {
316: now = b;
317: }
318:
319: public void run() {
320: if (running) {
321: running = false;
322: if (now) {
323: tellThreadsToStop();
324: } else {
325: stopAllThreads();
326: }
327: try {
328: Thread.sleep(10 * allThreads.size());
329: } catch (InterruptedException e) {
330: }
331: boolean stopped = verifyThreadsStopped();
332: if (stopped || now) {
333: notifyTestListenersOfEnd();
334: }
335: }
336: }
337: }
338:
339: public void run() {
340: log.info("Running the test!");
341: running = true;
342:
343: SearchByClass testPlan = new SearchByClass(TestPlan.class);
344: getTestTree().traverse(testPlan);
345: Object[] plan = testPlan.getSearchResults().toArray();
346: if (plan.length == 0) {
347: System.err.println("Could not find the TestPlan!");
348: log.error("Could not find the TestPlan!");
349: System.exit(1);
350: }
351: if (((TestPlan) plan[0]).isSerialized()) {
352: serialized = true;
353: }
354: JMeterContextService.startTest();
355: try {
356: compileTree();
357: } catch (RuntimeException e) {
358: log.error("Error occurred compiling the tree:", e);
359: JMeterUtils
360: .reportErrorToUser("Error occurred compiling the tree: - see log file");
361: return; // no point continuing
362: }
363: /**
364: * Notification of test listeners needs to happen after function
365: * replacement, but before setting RunningVersion to true.
366: */
367: testListeners = new SearchByClass(TestListener.class);
368: getTestTree().traverse(testListeners);
369:
370: // Merge in any additional test listeners
371: // currently only used by the function parser
372: testListeners.getSearchResults().addAll(testList);
373: testList.clear(); // no longer needed
374:
375: if (!startListenersLater)
376: notifyTestListenersOfStart();
377: getTestTree().traverse(new TurnElementsOn());
378: if (startListenersLater)
379: notifyTestListenersOfStart();
380:
381: List testLevelElements = new LinkedList(getTestTree().list(
382: getTestTree().getArray()[0]));
383: removeThreadGroups(testLevelElements);
384: SearchByClass searcher = new SearchByClass(ThreadGroup.class);
385: getTestTree().traverse(searcher);
386: TestCompiler.initialize();
387: // for each thread group, generate threads
388: // hand each thread the sampler controller
389: // and the listeners, and the timer
390: Iterator iter = searcher.getSearchResults().iterator();
391:
392: /*
393: * Here's where the test really starts. Run a Full GC now: it's no harm
394: * at all (just delays test start by a tiny amount) and hitting one too
395: * early in the test can impair results for short tests.
396: */
397: System.gc();
398:
399: notifier = new ListenerNotifier();
400:
401: schcdule_run = true;
402: JMeterContextService.getContext().setSamplingStarted(true);
403: int groupCount = 0;
404: JMeterContextService.clearTotalThreads();
405: startingGroups = true;
406: while (iter.hasNext()) {
407: groupCount++;
408: ThreadGroup group = (ThreadGroup) iter.next();
409: int numThreads = group.getNumThreads();
410: JMeterContextService.addTotalThreads(numThreads);
411: boolean onErrorStopTest = group.getOnErrorStopTest();
412: boolean onErrorStopThread = group.getOnErrorStopThread();
413: String groupName = group.getName();
414: int rampUp = group.getRampUp();
415: float perThreadDelay = ((float) (rampUp * 1000) / (float) numThreads);
416: log.info("Starting " + numThreads + " threads for group "
417: + groupName + ". Ramp up = " + rampUp + ".");
418:
419: if (onErrorStopTest) {
420: log.info("Test will stop on error");
421: } else if (onErrorStopThread) {
422: log.info("Thread will stop on error");
423: } else {
424: log.info("Continue on error");
425: }
426:
427: ListedHashTree threadGroupTree = (ListedHashTree) searcher
428: .getSubTree(group);
429: threadGroupTree.add(group, testLevelElements);
430: for (int i = 0; running && i < numThreads; i++) {
431: final JMeterThread jmeterThread = new JMeterThread(
432: cloneTree(threadGroupTree), this , notifier);
433: jmeterThread.setThreadNum(i);
434: jmeterThread.setThreadGroup(group);
435: jmeterThread.setInitialContext(JMeterContextService
436: .getContext());
437: jmeterThread
438: .setInitialDelay((int) (perThreadDelay * i));
439: jmeterThread.setThreadName(groupName + " "
440: + (groupCount) + "-" + (i + 1));
441:
442: scheduleThread(jmeterThread, group);
443:
444: // Set up variables for stop handling
445: jmeterThread.setEngine(this );
446: jmeterThread.setOnErrorStopTest(onErrorStopTest);
447: jmeterThread.setOnErrorStopThread(onErrorStopThread);
448:
449: Thread newThread = new Thread(jmeterThread);
450: newThread.setName(jmeterThread.getThreadName());
451: allThreads.put(jmeterThread, newThread);
452: if (serialized && !iter.hasNext()
453: && i == numThreads - 1) // last thread
454: {
455: serialized = false;
456: }
457: newThread.start();
458: }
459: schcdule_run = false;
460: if (serialized) {
461: while (running && allThreads.size() > 0) {
462: try {
463: Thread.sleep(1000);
464: } catch (InterruptedException e) {
465: }
466: }
467: }
468: }
469: startingGroups = false;
470: }
471:
472: /**
473: * This will schedule the time for the JMeterThread.
474: *
475: * @param thread
476: * @param group
477: */
478: private void scheduleThread(JMeterThread thread, ThreadGroup group) {
479: // if true the Scheduler is enabled
480: if (group.getScheduler()) {
481: long now = System.currentTimeMillis();
482: // set the start time for the Thread
483: if (group.getDelay() > 0) {// Duration is in seconds
484: thread.setStartTime(group.getDelay() * 1000 + now);
485: } else {
486: long start = group.getStartTime();
487: if (start < now)
488: start = now; // Force a sensible start time
489: thread.setStartTime(start);
490: }
491:
492: // set the endtime for the Thread
493: if (group.getDuration() > 0) {// Duration is in seconds
494: thread.setEndTime(group.getDuration() * 1000
495: + (thread.getStartTime()));
496: } else {
497: thread.setEndTime(group.getEndTime());
498: }
499:
500: // Enables the scheduler
501: thread.setScheduled(true);
502: }
503: }
504:
505: private boolean verifyThreadsStopped() {
506: boolean stoppedAll = true;
507: Iterator iter = new HashSet(allThreads.keySet()).iterator();
508: while (iter.hasNext()) {
509: Thread t = (Thread) allThreads.get(iter.next());
510: if (t != null && t.isAlive()) {
511: try {
512: t.join(WAIT_TO_DIE);
513: } catch (InterruptedException e) {
514: }
515: if (t.isAlive()) {
516: stoppedAll = false;
517: log.info("Thread won't die: " + t.getName());
518: }
519: }
520: }
521: return stoppedAll;
522: }
523:
524: private void tellThreadsToStop() {
525: Iterator iter = new HashSet(allThreads.keySet()).iterator();
526: while (iter.hasNext()) {
527: JMeterThread item = (JMeterThread) iter.next();
528: item.stop();
529: Thread t = (Thread) allThreads.get(item);
530: if (t != null) {
531: t.interrupt();
532: } else {
533: log.warn("Lost thread: " + item.getThreadName());
534: allThreads.remove(item);
535: }
536: }
537: }
538:
539: public void askThreadsToStop() {
540: engine.stopTest(false);
541: }
542:
543: private void stopAllThreads() {
544: Iterator iter = new HashSet(allThreads.keySet()).iterator();
545: while (iter.hasNext()) {
546: JMeterThread item = (JMeterThread) iter.next();
547: item.stop();
548: }
549: }
550:
551: // Remote exit
552: public void exit() {
553: // Needs to be run in a separate thread to allow RMI call to return OK
554: Thread t = new Thread() {
555: public void run() {
556: // log.info("Pausing");
557: try {
558: Thread.sleep(1000);
559: } catch (InterruptedException e) {
560: }
561: log.info("Bye");
562: System.exit(0);
563: }
564: };
565: log.info("Starting Closedown");
566: t.start();
567: }
568:
569: public void setProperties(Properties p) {
570: log.info("Applying properties " + p);
571: JMeterUtils.getJMeterProperties().putAll(p);
572: }
573: }
|