001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.framework;
020:
021: import java.util.LinkedList;
022: import java.util.List;
023: import org.apache.log4j.Logger;
024: import org.mactor.brokers.MessageBrokerManager;
025: import org.mactor.framework.spec.GlobalConfig;
026: import org.mactor.framework.spec.MessageBrokersConfig;
027: import org.mactor.framework.spec.TestSpec;
028:
029: /**
030: * The
031: *
032: * @author Lars Ivar Almli
033: */
034: public class TestSuiteRunner {
035: protected static Logger log = Logger
036: .getLogger(TestSuiteRunner.class);
037: private GlobalConfig globalConfig;
038: private MessageBrokerManager brokerManager;
039: private TestSpec testSpec;
040: private boolean terminate = false;
041: private static String currentGlobalConfigPath = null;
042:
043: public TestSuiteRunner(TestSpec testSpec) throws MactorException {
044: if (currentGlobalConfigPath != null
045: && !currentGlobalConfigPath.equals(testSpec
046: .getGlobalConfigPath()))
047: throw new MactorException(
048: "The test spec refers to the global config file '"
049: + testSpec.getGlobalConfigPath()
050: + "', which is different from the one already loaded.\n Start a new MActor instance to run this test");
051: currentGlobalConfigPath = testSpec.getGlobalConfigPath();
052: this .globalConfig = GlobalConfig.readFromFile(testSpec
053: .getGlobalConfigPath());
054: this .testSpec = testSpec;
055: this .brokerManager = MessageBrokerManager
056: .getManager(globalConfig.getBrokersConfig());
057: }
058:
059: /**
060: * Obs! Not thread safe
061: *
062: * @param data
063: * @param fl
064: * @return
065: * @throws MactorException
066: */
067: public int run(TestData data, TestFeedbackListener fl)
068: throws MactorException {
069: EventReporter reporter = new EventReporter(fl);
070: reporter.start();
071: try {
072: int errorCount = 0;
073: int dataCount = data.getRowCount();
074: List<TestRunnable> jobs = new LinkedList<TestRunnable>();
075: TestThreadPool pool = new TestThreadPool(globalConfig
076: .getMaxNumberOfTestThreads());
077: for (int i = 0; (i < dataCount) && !terminate; i++) {
078: TestContextImpl context = new TestContextImpl(
079: globalConfig, testSpec);
080: context.setValues(data.getRowData(i));
081: TestRunnable r = new TestRunnable(i,
082: new TestSpecRunner(context, brokerManager,
083: reporter));
084: jobs.add(r);
085: pool.addJob(r);
086: }
087: pool.join();
088: for (TestRunnable runnable : jobs) {
089: if (!runnable.isSucces())
090: errorCount++;
091: }
092: return errorCount;
093: } finally {
094: reporter.stop();
095: }
096: }
097:
098: public void terminate() {
099: terminate = true;
100: }
101:
102: public void runAsMock(TestFeedbackListener fl)
103: throws MactorException {
104: while (true) {
105: try {
106: TestContextImpl context = new TestContextImpl(
107: globalConfig, testSpec);
108: TestSpecRunner testSpecRunner = new TestSpecRunner(
109: context, brokerManager, fl);
110: testSpecRunner.runTest(0);
111: } catch (MactorException me) {
112: log.info("Mock restaring..");
113: }
114: }
115: }
116:
117: @Override
118: protected void finalize() throws Throwable {
119: super .finalize();
120: }
121:
122: // This pool will grow to the max allowed threads and stay there
123: private class TestThreadPool {
124: private int maxNumberOfThreads;
125: private boolean stopped = false;
126:
127: private TestThreadPool(int maxNumberOfThreads) {
128: this .maxNumberOfThreads = maxNumberOfThreads;
129: }
130:
131: LinkedList<Worker> idleWorkers = new LinkedList<Worker>();
132: LinkedList<Worker> activeWorkers = new LinkedList<Worker>();
133: private Object lock = new Object();
134:
135: private Worker getIdleWorker() throws InterruptedException {
136: synchronized (lock) {
137: for (;;) {
138: if (idleWorkers.size() > 0) {
139: Worker w = idleWorkers.removeLast();
140: activeWorkers.add(w);
141: return w;
142: } else if ((activeWorkers.size() + idleWorkers
143: .size()) < maxNumberOfThreads) {
144: Worker w = new Worker();
145: activeWorkers.add(w);
146: new Thread(w).start();
147: return w;
148: }
149: lock.wait();
150: }
151: }
152: }
153:
154: /**
155: * Block until thread is available
156: *
157: * @param runnable
158: */
159: public void addJob(TestRunnable runnable) {
160: try {
161: synchronized (lock) {
162: if (stopped)
163: throw new RuntimeException(
164: "The TestThreadPool has been stopped. No more jobs can be added");
165: getIdleWorker().assignJob(runnable);
166: }
167: } catch (InterruptedException ie) {
168: ie.printStackTrace();
169: }
170: }
171:
172: public void join() {
173: try {
174: synchronized (lock) {
175: for (;;) {
176: if (activeWorkers.size() == 0)
177: break;
178: lock.wait();
179: }
180: stopped = true;
181: }
182: for (Worker w : idleWorkers) {
183: w.stop();
184: }
185: } catch (InterruptedException ie) {
186: }
187: }
188:
189: private void completed(Worker w) {
190: synchronized (lock) {
191: if (!activeWorkers.remove(w))
192: throw new RuntimeException("Unexpected state");
193: idleWorkers.add(w);
194: lock.notifyAll();
195: }
196: log.debug("active:" + activeWorkers.size());
197: }
198:
199: private class Worker implements Runnable {
200: Runnable job;
201: private Object lock = new Object();
202:
203: public void run() {
204: while (!stopped) {
205: try {
206: Runnable j = null;
207: synchronized (lock) {
208: if (job != null)
209: j = job;
210: else
211: lock.wait();
212: if (job != null)
213: j = job;
214: job = null;
215: }
216: if (j != null) {
217: try {
218: j.run();
219: } finally {
220: completed(this );
221: }
222: }
223: } catch (Exception e) {
224: e.printStackTrace();
225: }
226: }
227: }
228:
229: public void stop() {
230: synchronized (this .lock) {
231: this .lock.notifyAll();
232: }
233: }
234:
235: public void assignJob(Runnable job) {
236: synchronized (this .lock) {
237: this .job = job;
238: this .lock.notifyAll();
239: }
240: }
241: }
242: }
243:
244: private class TestRunnable implements Runnable {
245: boolean succes = false;
246: int index;
247: TestSpecRunner testSpecRunner;
248:
249: public TestRunnable(int index, TestSpecRunner testSpecRunner) {
250: this .index = index;
251: this .testSpecRunner = testSpecRunner;
252: }
253:
254: public boolean isSucces() {
255: return succes;
256: }
257:
258: public void run() {
259: try {
260: testSpecRunner.runTest(index);
261: this .succes = true;
262: } catch (MactorException ce) {
263: ce.printStackTrace();
264: }
265: }
266: }
267:
268: public GlobalConfig getGlobalConfig() {
269: return globalConfig;
270: }
271:
272: public MessageBrokersConfig getMessageBrokersConfig() {
273: return globalConfig.getBrokersConfig();
274: }
275:
276: public TestSpec getTestSpec() {
277: return testSpec;
278: }
279: }
|