001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: *
017: */
018:
019: package org.apache.jmeter.threads;
020:
021: import java.io.Serializable;
022: import java.util.ArrayList;
023: import java.util.Collection;
024: import java.util.Iterator;
025: import java.util.List;
026: import java.util.ListIterator;
027:
028: import org.apache.jmeter.assertions.Assertion;
029: import org.apache.jmeter.assertions.AssertionResult;
030: import org.apache.jmeter.control.Controller;
031: import org.apache.jmeter.control.TransactionSampler;
032: import org.apache.jmeter.engine.StandardJMeterEngine;
033: import org.apache.jmeter.engine.event.LoopIterationEvent;
034: import org.apache.jmeter.engine.event.LoopIterationListener;
035: import org.apache.jmeter.gui.GuiPackage;
036: import org.apache.jmeter.processor.PostProcessor;
037: import org.apache.jmeter.samplers.SampleEvent;
038: import org.apache.jmeter.samplers.SampleListener;
039: import org.apache.jmeter.samplers.SampleResult;
040: import org.apache.jmeter.samplers.Sampler;
041: import org.apache.jmeter.testbeans.TestBeanHelper;
042: import org.apache.jmeter.testelement.TestElement;
043: import org.apache.jmeter.testelement.TestListener;
044: import org.apache.jmeter.testelement.ThreadListener;
045: import org.apache.jmeter.timers.Timer;
046: import org.apache.jmeter.util.JMeterUtils;
047: import org.apache.jorphan.collections.HashTree;
048: import org.apache.jorphan.collections.HashTreeTraverser;
049: import org.apache.jorphan.collections.SearchByClass;
050: import org.apache.jorphan.logging.LoggingManager;
051: import org.apache.jorphan.util.JMeterStopTestException;
052: import org.apache.jorphan.util.JMeterStopThreadException;
053: import org.apache.log.Logger;
054:
055: /**
056: * The JMeter interface to the sampling process, allowing JMeter to see the
057: * timing, add listeners for sampling events and to stop the sampling process.
058: *
059: */
060: public class JMeterThread implements Runnable, Serializable {
061: private static final Logger log = LoggingManager
062: .getLoggerForClass();
063:
064: private static final long serialVersionUID = 23L; // Remember to change this when the class changes ...
065:
066: // NOT USED private static Map samplers = new HashMap();
067: private int initialDelay = 0;
068:
069: private Controller controller;
070:
071: private boolean running;
072:
073: private HashTree testTree;
074:
075: private transient TestCompiler compiler;
076:
077: private JMeterThreadMonitor monitor;
078:
079: private String threadName;
080:
081: private transient JMeterContext threadContext;
082:
083: private transient JMeterVariables threadVars;
084:
085: private Collection testListeners;
086:
087: private transient ListenerNotifier notifier;
088:
089: private int threadNum = 0;
090:
091: private long startTime = 0;
092:
093: private long endTime = 0;
094:
095: private boolean scheduler = false;
096:
097: // based on this scheduler is enabled or disabled
098:
099: private ThreadGroup threadGroup; // Gives access to parent thread
100: // threadGroup
101:
102: private StandardJMeterEngine engine = null; // For access to stop methods.
103:
104: private boolean onErrorStopTest;
105:
106: private boolean onErrorStopThread;
107:
108: public static final String PACKAGE_OBJECT = "JMeterThread.pack"; // $NON-NLS-1$
109:
110: public static final String LAST_SAMPLE_OK = "JMeterThread.last_sample_ok"; // $NON-NLS-1$
111:
112: public JMeterThread() {
113: }
114:
115: public JMeterThread(HashTree test, JMeterThreadMonitor monitor,
116: ListenerNotifier note) {
117: this .monitor = monitor;
118: threadVars = new JMeterVariables();
119: testTree = test;
120: compiler = new TestCompiler(testTree, threadVars);
121: controller = (Controller) testTree.getArray()[0];
122: SearchByClass threadListenerSearcher = new SearchByClass(
123: TestListener.class);
124: test.traverse(threadListenerSearcher);
125: testListeners = threadListenerSearcher.getSearchResults();
126: notifier = note;
127: running = true;
128: }
129:
130: public void setInitialContext(JMeterContext context) {
131: threadVars.putAll(context.getVariables());
132: }
133:
134: /**
135: * Checks whether the JMeterThread is Scheduled. author
136: * T.Elanjchezhiyan(chezhiyan@siptech.co.in)
137: */
138: public boolean isScheduled() {
139: return this .scheduler;
140: }
141:
142: /**
143: * Enable the scheduler for this JMeterThread. author
144: * T.Elanjchezhiyan(chezhiyan@siptech.co.in)
145: */
146: public void setScheduled(boolean sche) {
147: this .scheduler = sche;
148: }
149:
150: /**
151: * Set the StartTime for this Thread.
152: *
153: * @param stime
154: * the StartTime value. author
155: * T.Elanjchezhiyan(chezhiyan@siptech.co.in)
156: */
157: public void setStartTime(long stime) {
158: startTime = stime;
159: }
160:
161: /**
162: * Get the start time value.
163: *
164: * @return the start time value. author
165: * T.Elanjchezhiyan(chezhiyan@siptech.co.in)
166: */
167: public long getStartTime() {
168: return startTime;
169: }
170:
171: /**
172: * Set the EndTime for this Thread.
173: *
174: * @param etime
175: * the EndTime value. author
176: * T.Elanjchezhiyan(chezhiyan@siptech.co.in)
177: */
178: public void setEndTime(long etime) {
179: endTime = etime;
180: }
181:
182: /**
183: * Get the end time value.
184: *
185: * @return the end time value. author
186: * T.Elanjchezhiyan(chezhiyan@siptech.co.in)
187: */
188: public long getEndTime() {
189: return endTime;
190: }
191:
192: /**
193: * Check the scheduled time is completed.
194: *
195: * author T.Elanjchezhiyan(chezhiyan@siptech.co.in)
196: */
197: private void stopScheduler() {
198: long delay = System.currentTimeMillis() - endTime;
199: if ((delay >= 0)) {
200: running = false;
201: }
202: }
203:
204: /**
205: * Wait until the scheduled start time if necessary
206: *
207: * Author T.Elanjchezhiyan(chezhiyan@siptech.co.in)
208: */
209: private void startScheduler() {
210: long delay = (startTime - System.currentTimeMillis());
211: if (delay > 0) {
212: try {
213: Thread.sleep(delay);
214: } catch (Exception e) {
215: }
216: }
217: }
218:
219: public void setThreadName(String threadName) {
220: this .threadName = threadName;
221: }
222:
223: /*
224: * See below for reason for this change. Just in case this causes problems,
225: * allow the change to be backed out
226: */
227: private static final boolean startEarlier = JMeterUtils
228: .getPropDefault("jmeterthread.startearlier", true); // $NON-NLS-1$
229:
230: private static final boolean reversePostProcessors = JMeterUtils
231: .getPropDefault("jmeterthread.reversePostProcessors", false); // $NON-NLS-1$
232:
233: static {
234: if (startEarlier) {
235: log
236: .info("jmeterthread.startearlier=true (see jmeter.properties)");
237: } else {
238: log
239: .info("jmeterthread.startearlier=false (see jmeter.properties)");
240: }
241: if (reversePostProcessors) {
242: log.info("Running PostProcessors in reverse order");
243: } else {
244: log.info("Running PostProcessors in forward order");
245: }
246: }
247:
248: public void run() {
249: try {
250: initRun();
251: while (running) {
252: Sampler sam;
253: while (running && (sam = controller.next()) != null) {
254: try {
255: threadContext.setCurrentSampler(sam);
256:
257: // Check if we are running a transaction
258: TransactionSampler transactionSampler = null;
259: if (sam instanceof TransactionSampler) {
260: transactionSampler = (TransactionSampler) sam;
261: }
262: // Find the package for the transaction
263: SamplePackage transactionPack = null;
264: if (transactionSampler != null) {
265: transactionPack = compiler
266: .configureTransactionSampler(transactionSampler);
267:
268: // Check if the transaction is done
269: if (transactionSampler.isTransactionDone()) {
270: // Get the transaction sample result
271: SampleResult transactionResult = transactionSampler
272: .getTransactionResult();
273: transactionResult
274: .setThreadName(threadName);
275: transactionResult
276: .setGroupThreads(threadGroup
277: .getNumberOfThreads());
278: transactionResult
279: .setAllThreads(JMeterContextService
280: .getNumberOfThreads());
281:
282: // Check assertions for the transaction sample
283: checkAssertions(transactionPack
284: .getAssertions(),
285: transactionResult);
286: // Notify listeners with the transaction sample result
287: notifyListeners(transactionPack
288: .getSampleListeners(),
289: transactionResult);
290: compiler.done(transactionPack);
291: // Transaction is done, we do not have a sampler to sample
292: sam = null;
293: } else {
294: // It is the sub sampler of the transaction that will be sampled
295: sam = transactionSampler
296: .getSubSampler();
297: }
298: }
299:
300: // Check if we have a sampler to sample
301: if (sam != null) {
302: // Get the sampler ready to sample
303: SamplePackage pack = compiler
304: .configureSampler(sam);
305:
306: // Hack: save the package for any transaction
307: // controllers
308: threadContext.getVariables().putObject(
309: PACKAGE_OBJECT, pack);
310:
311: delay(pack.getTimers());
312: Sampler sampler = pack.getSampler();
313: sampler.setThreadContext(threadContext);
314: sampler.setThreadName(threadName);
315: TestBeanHelper.prepare(sampler);
316:
317: // Perform the actual sample
318: SampleResult result = sampler.sample(null);
319: // TODO: remove this useless Entry parameter
320:
321: // If we got any results, then perform processing on the result
322: if (result != null) {
323: result.setGroupThreads(threadGroup
324: .getNumberOfThreads());
325: result
326: .setAllThreads(JMeterContextService
327: .getNumberOfThreads());
328: result.setThreadName(threadName);
329: threadContext.setPreviousResult(result);
330: runPostProcessors(pack
331: .getPostProcessors());
332: checkAssertions(pack.getAssertions(),
333: result);
334: // Do not send subsamples to listeners which receive the transaction sample
335: List sampleListeners = getSampleListeners(
336: pack, transactionPack,
337: transactionSampler);
338: notifyListeners(sampleListeners, result);
339: compiler.done(pack);
340: // Add the result as subsample of transaction if we are in a transaction
341: if (transactionSampler != null) {
342: transactionSampler
343: .addSubSamplerResult(result);
344: }
345:
346: // Check if thread or test should be stopped
347: if (result.isStopThread()
348: || (!result.isSuccessful() && onErrorStopThread)) {
349: stopThread();
350: }
351: if (result.isStopTest()
352: || (!result.isSuccessful() && onErrorStopTest)) {
353: stopTest();
354: }
355: } else {
356: compiler.done(pack); // Finish up
357: }
358: }
359: if (scheduler) {
360: // checks the scheduler to stop the iteration
361: stopScheduler();
362: }
363: } catch (JMeterStopTestException e) {
364: log.info("Stopping Test: " + e.toString());
365: stopTest();
366: } catch (JMeterStopThreadException e) {
367: log.info("Stopping Thread: " + e.toString());
368: stopThread();
369: } catch (Exception e) {
370: log.error("", e);
371: }
372: }
373: if (controller.isDone()) {
374: running = false;
375: }
376: }
377: }
378: // Might be found by contoller.next()
379: catch (JMeterStopTestException e) {
380: log.info("Stopping Test: " + e.toString());
381: stopTest();
382: } catch (JMeterStopThreadException e) {
383: log.info("Stop Thread seen: " + e.toString());
384: } catch (Exception e) {
385: log.error("Test failed!", e);
386: } catch (ThreadDeath e) {
387: throw e; // Must not ignore this one
388: } catch (Error e) {// Make sure errors are output to the log file
389: log.error("Test failed!", e);
390: } finally {
391: threadContext.clear();
392: log.info("Thread " + threadName + " is done");
393: monitor.threadFinished(this );
394: threadFinished();
395: }
396: }
397:
398: /**
399: * Get the SampleListeners for the sampler. Listeners who receive transaction sample
400: * will not be in this list.
401: *
402: * @param samplePack
403: * @param transactionPack
404: * @param transactionSampler
405: * @return the listeners who should receive the sample result
406: */
407: private List getSampleListeners(SamplePackage samplePack,
408: SamplePackage transactionPack,
409: TransactionSampler transactionSampler) {
410: List sampleListeners = samplePack.getSampleListeners();
411: // Do not send subsamples to listeners which receive the transaction sample
412: if (transactionSampler != null) {
413: ArrayList onlySubSamplerListeners = new ArrayList();
414: List transListeners = transactionPack.getSampleListeners();
415: for (Iterator i = sampleListeners.iterator(); i.hasNext();) {
416: SampleListener listener = (SampleListener) i.next();
417: // Check if this instance is present in transaction listener list
418: boolean found = false;
419: for (Iterator j = transListeners.iterator(); j
420: .hasNext();) {
421: // Check for the same instance
422: if (j.next() == listener) {
423: found = true;
424: break;
425: }
426: }
427: if (!found) {
428: onlySubSamplerListeners.add(listener);
429: }
430: }
431: sampleListeners = onlySubSamplerListeners;
432: }
433: return sampleListeners;
434: }
435:
436: /**
437: *
438: */
439: protected void initRun() {
440: threadContext = JMeterContextService.getContext();
441: threadContext.setVariables(threadVars);
442: threadContext.setThreadNum(getThreadNum());
443: threadContext.getVariables().put(LAST_SAMPLE_OK, "true");
444: threadContext.setThread(this );
445: threadContext.setThreadGroup(threadGroup);
446: testTree.traverse(compiler);
447: // listeners = controller.getListeners();
448: if (scheduler) {
449: // set the scheduler to start
450: startScheduler();
451: }
452: rampUpDelay();
453: log.info("Thread " + Thread.currentThread().getName()
454: + " started");
455: JMeterContextService.incrNumberOfThreads();
456: threadGroup.incrNumberOfThreads();
457: GuiPackage gp = GuiPackage.getInstance();
458: if (gp != null) {// check there is a GUI
459: gp.getMainFrame().updateCounts();
460: }
461: /*
462: * Setting SamplingStarted before the contollers are initialised allows
463: * them to access the running values of functions and variables (however
464: * it does not seem to help with the listeners)
465: */
466: if (startEarlier)
467: threadContext.setSamplingStarted(true);
468: controller.initialize();
469: controller.addIterationListener(new IterationListener());
470: if (!startEarlier)
471: threadContext.setSamplingStarted(true);
472: threadStarted();
473: }
474:
475: /**
476: *
477: */
478: private void threadStarted() {
479: ThreadListenerTraverser startup = new ThreadListenerTraverser(
480: true);
481: testTree.traverse(startup);
482: }
483:
484: /**
485: *
486: */
487: private void threadFinished() {
488: ThreadListenerTraverser shut = new ThreadListenerTraverser(
489: false);
490: testTree.traverse(shut);
491: JMeterContextService.decrNumberOfThreads();
492: threadGroup.decrNumberOfThreads();
493: GuiPackage gp = GuiPackage.getInstance();
494: if (gp != null) {
495: gp.getMainFrame().updateCounts();
496: }
497: }
498:
499: private static class ThreadListenerTraverser implements
500: HashTreeTraverser {
501: private boolean isStart = false;
502:
503: private ThreadListenerTraverser(boolean start) {
504: isStart = start;
505: }
506:
507: public void addNode(Object node, HashTree subTree) {
508: if (node instanceof ThreadListener) {
509: ThreadListener tl = (ThreadListener) node;
510: if (isStart) {
511: tl.threadStarted();
512: } else {
513: tl.threadFinished();
514: }
515: }
516: }
517:
518: public void subtractNode() {
519: }
520:
521: public void processPath() {
522: }
523: }
524:
525: public String getThreadName() {
526: return threadName;
527: }
528:
529: public void stop() {
530: running = false;
531: log.info("Stopping " + threadName);
532: }
533:
534: private void stopTest() {
535: running = false;
536: log.info("Stop Test detected by thread " + threadName);
537: // engine.stopTest();
538: if (engine != null)
539: engine.askThreadsToStop();
540: }
541:
542: private void stopThread() {
543: running = false;
544: log.info("Stop Thread detected by thread " + threadName);
545: }
546:
547: private void checkAssertions(List assertions, SampleResult result) {
548: Iterator iter = assertions.iterator();
549: while (iter.hasNext()) {
550: Assertion assertion = (Assertion) iter.next();
551: TestBeanHelper.prepare((TestElement) assertion);
552: AssertionResult assertionResult = assertion
553: .getResult(result);
554: result.setSuccessful(result.isSuccessful()
555: && !(assertionResult.isError() || assertionResult
556: .isFailure()));
557: result.addAssertionResult(assertionResult);
558: }
559: threadContext.getVariables().put(LAST_SAMPLE_OK,
560: Boolean.toString(result.isSuccessful()));
561: }
562:
563: private void runPostProcessors(List extractors) {
564: ListIterator iter;
565: if (reversePostProcessors) {// Original (rather odd) behaviour
566: iter = extractors.listIterator(extractors.size());// start at the end
567: while (iter.hasPrevious()) {
568: PostProcessor ex = (PostProcessor) iter.previous();
569: TestBeanHelper.prepare((TestElement) ex);
570: ex.process();
571: }
572: } else {
573: iter = extractors.listIterator(); // start at the beginning
574: while (iter.hasNext()) {
575: PostProcessor ex = (PostProcessor) iter.next();
576: TestBeanHelper.prepare((TestElement) ex);
577: ex.process();
578: }
579: }
580: }
581:
582: private void delay(List timers) {
583: long sum = 0;
584: Iterator iter = timers.iterator();
585: while (iter.hasNext()) {
586: Timer timer = (Timer) iter.next();
587: TestBeanHelper.prepare((TestElement) timer);
588: sum += timer.delay();
589: }
590: if (sum > 0) {
591: try {
592: Thread.sleep(sum);
593: } catch (InterruptedException e) {
594: log.error("", e);
595: }
596: }
597: }
598:
599: private void notifyTestListeners() {
600: threadVars.incIteration();
601: Iterator iter = testListeners.iterator();
602: while (iter.hasNext()) {
603: TestListener listener = (TestListener) iter.next();
604: if (listener instanceof TestElement) {
605: listener.testIterationStart(new LoopIterationEvent(
606: controller, threadVars.getIteration()));
607: ((TestElement) listener).recoverRunningVersion();
608: } else {
609: listener.testIterationStart(new LoopIterationEvent(
610: controller, threadVars.getIteration()));
611: }
612: }
613: }
614:
615: private void notifyListeners(List listeners, SampleResult result) {
616: SampleEvent event = new SampleEvent(result, threadGroup
617: .getName());
618: notifier.notifyListeners(event, listeners);
619:
620: }
621:
622: public void setInitialDelay(int delay) {
623: initialDelay = delay;
624: }
625:
626: /**
627: * Initial delay if ramp-up period is active for this threadGroup.
628: */
629: private void rampUpDelay() {
630: if (initialDelay > 0) {
631: try {
632: Thread.sleep(initialDelay);
633: } catch (InterruptedException e) {
634: }
635: }
636: }
637:
638: /**
639: * Returns the threadNum.
640: */
641: public int getThreadNum() {
642: return threadNum;
643: }
644:
645: /**
646: * Sets the threadNum.
647: *
648: * @param threadNum
649: * the threadNum to set
650: */
651: public void setThreadNum(int threadNum) {
652: this .threadNum = threadNum;
653: }
654:
655: private class IterationListener implements LoopIterationListener {
656: /*
657: * (non-Javadoc)
658: *
659: * @see LoopIterationListener#iterationStart(LoopIterationEvent)
660: */
661: public void iterationStart(LoopIterationEvent iterEvent) {
662: notifyTestListeners();
663: }
664: }
665:
666: /**
667: * Save the engine instance for access to the stop methods
668: *
669: * @param engine
670: */
671: public void setEngine(StandardJMeterEngine engine) {
672: this .engine = engine;
673: }
674:
675: /**
676: * Should Test stop on sampler error?
677: *
678: * @param b -
679: * true or false
680: */
681: public void setOnErrorStopTest(boolean b) {
682: onErrorStopTest = b;
683: }
684:
685: /**
686: * Should Thread stop on Sampler error?
687: *
688: * @param b -
689: * true or false
690: */
691: public void setOnErrorStopThread(boolean b) {
692: onErrorStopThread = b;
693: }
694:
695: public ThreadGroup getThreadGroup() {
696: return threadGroup;
697: }
698:
699: public void setThreadGroup(ThreadGroup group) {
700: this.threadGroup = group;
701: }
702:
703: }
|