001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.framework;
020:
021: import java.util.HashMap;
022: import java.util.LinkedList;
023: import java.util.List;
024: import java.util.Map;
025: import org.apache.log4j.Logger;
026: import org.mactor.brokers.Message;
027: import org.mactor.brokers.MessageBrokerManager;
028: import org.mactor.brokers.MessageSubscriber;
029: import org.mactor.framework.TestEvent.EventType;
030: import org.mactor.framework.commandexecutors.ActionCommandExecutor;
031: import org.mactor.framework.commandexecutors.ActionCommandExecutorFactory;
032: import org.mactor.framework.commandexecutors.MessageBuilderCommandExecutorFactory;
033: import org.mactor.framework.commandexecutors.MessageSelectorCommandExecutorFacotry;
034: import org.mactor.framework.commandexecutors.ValueCommandExecutor;
035: import org.mactor.framework.commandexecutors.ValueCommandExecutorFactory;
036: import org.mactor.framework.spec.ActionSpec;
037: import org.mactor.framework.spec.LoopSpec;
038: import org.mactor.framework.spec.MessagePublishSpec;
039: import org.mactor.framework.spec.MessageReceiveSpec;
040: import org.mactor.framework.spec.MessageRespondSpec;
041: import org.mactor.framework.spec.MessageSubscribeSpec;
042: import org.mactor.framework.spec.SpecNode;
043: import org.mactor.framework.spec.TestSpec;
044: import org.mactor.framework.spec.ValueSpec;
045:
046: /**
047: * The test-runner implementation
048: *
049: * @author Lars Ivar Almli
050: */
051: public class TestSpecRunner {
052: protected static Logger log = Logger
053: .getLogger(TestSpecRunner.class);
054: protected static Logger test_timing_log = Logger
055: .getLogger("test_timing");
056:
057: private TestContextImpl context;
058: private TestFeedbackListener feedbackListener;
059: private MessageBrokerManager brokerManager;
060: private Map<String, MessageWaiter> messageWaiters = new HashMap<String, MessageWaiter>();
061: private TestSpec testSpec;
062: private int testDataIndex;
063: private String testId = AppUtil.getAppInstanceId() + "_"
064: + getNextId();
065: private static Object id_lock = new Object();
066: private static long counter = 0;
067:
068: private static long getNextId() {
069: synchronized (id_lock) {
070: return counter++;
071: }
072: }
073:
074: private void cleanUp() {
075: try {
076: for (MessageWaiter mw : messageWaiters.values()) {
077: this .brokerManager.unsubscribe(mw.getChannel(), mw);
078: mw.stop();
079: }
080: messageWaiters.clear();
081: } catch (Exception e) {
082: log.error("Failed to unsubscribe", e);
083: }
084: }
085:
086: public TestSpecRunner(TestContextImpl context,
087: MessageBrokerManager brokerManager,
088: TestFeedbackListener feedbackListener)
089: throws MactorException {
090: this .context = context;
091: this .brokerManager = brokerManager;
092: this .testSpec = context.getTestSpec();
093: this .feedbackListener = feedbackListener;
094: }
095:
096: private void reportNodeStart(SpecNode node) {
097: feedbackListener.onEvent(new TestEvent(EventType.Start, node,
098: testDataIndex), context);
099: }
100:
101: private void reportSuccessfulNodeEnd(SpecNode node, String output) {
102: feedbackListener.onEvent(new TestEvent(EventType.End, node,
103: testDataIndex, output, true), context);
104: }
105:
106: private void reportFaultyNodeEnd(SpecNode node, String output,
107: MactorException cause) {
108: feedbackListener.onEvent(new TestEvent(EventType.End, node,
109: testDataIndex, output, false, cause), context);
110: }
111:
112: private void reportTerminated(boolean success, MactorException cause) {
113: feedbackListener.onEvent(new TestEvent(EventType.End, testSpec,
114: testDataIndex, null, success, cause), context);
115: }
116:
117: private void reportTestStart() {
118: feedbackListener.onEvent(new TestEvent(EventType.Start,
119: testSpec, testDataIndex), context);
120: }
121:
122: public void runTest(int testDataIndex) throws MactorException {
123: this .testDataIndex = testDataIndex;
124: if (testSpec.getDelayBeforeStartSeconds() != 0)
125: sleep(testSpec.getDelayBeforeStartSeconds() * 1000);
126: reportTestStart();
127: MactorException ex = null;
128: try {
129: try {
130: doNodes(testSpec.getSpecNodes());
131: } catch (MactorException e) {
132: ex = e;
133: throw e;
134: }
135: } finally {
136: cleanUp();
137: reportTerminated(ex == null, ex);
138: }
139: }
140:
141: private void doNodes(List<SpecNode> nodes) throws MactorException {
142: for (SpecNode node : nodes) {
143: try {
144: reportNodeStart(node);
145: doNode(node);
146: reportSuccessfulNodeEnd(node, "Success");
147: } catch (MactorException ce) {
148: reportFaultyNodeEnd(node, "Error", ce);
149: throw ce;
150: } catch (RuntimeException e) {
151: MactorException re = new MactorException(e);
152: reportFaultyNodeEnd(node, "Runtime error", re);
153: throw re;
154: }
155: }
156: }
157:
158: private void doNode(SpecNode node) throws MactorException {
159: if (log.isDebugEnabled())
160: log.debug("starting on node " + node.getName());
161: long nodeStartTime = 0;
162: Exception ex = null;
163: if (test_timing_log.isInfoEnabled()) {
164: nodeStartTime = System.currentTimeMillis();
165:
166: }
167: try {
168: if (node instanceof MessageSubscribeSpec) {
169: doMessageSubscribeNode((MessageSubscribeSpec) node);
170: } else if (node instanceof MessagePublishSpec) {
171: doMessagePublishNode((MessagePublishSpec) node);
172: } else if (node instanceof MessageReceiveSpec) {
173: doMessageReceiveNode((MessageReceiveSpec) node);
174: } else if (node instanceof ValueSpec) {
175: doValueNode((ValueSpec) node);
176: } else if (node instanceof ActionSpec) {
177: doActionNode((ActionSpec) node);
178: } else if (node instanceof MessageRespondSpec) {
179: doMessageResponseNode((MessageRespondSpec) node);
180: } else if (node instanceof LoopSpec) {
181: doLoopNode((LoopSpec) node);
182: } else {
183: throw new MactorException("Unsupported node:"
184: + node.getName());
185: }
186: } catch (MactorException me) {
187: ex = me;
188: if (log.isDebugEnabled())
189: log.debug("%%%%% NODE: " + node.getName());
190: throw me;
191: } catch (RuntimeException re) {
192: ex = re;
193: throw re;
194: } finally {
195: if (test_timing_log.isInfoEnabled()) {
196: test_timing_log.info(testId + ";" + node.getName()
197: + ";" + (ex == null) + ";"
198: + (System.currentTimeMillis() - nodeStartTime));
199: }
200: }
201: }
202:
203: private void doMessageSubscribeNode(MessageSubscribeSpec spec)
204: throws MactorException {
205: MessageWaiter mw = new MessageWaiter(spec.getChannel());
206: if (messageWaiters.containsKey(spec.getName()))
207: throw new RuntimeException("Unexpected state");
208: messageWaiters.put(spec.getName(), mw);
209: this .brokerManager.subscribe(spec.getChannel(), mw,
210: MessageSelectorCommandExecutorFacotry.createExecutor(
211: context, spec.getMessageSelector()));
212: }
213:
214: private void doMessagePublishNode(MessagePublishSpec spec)
215: throws MactorException {
216: Message message = MessageBuilderCommandExecutorFactory
217: .createExecutor(context,
218: ((MessagePublishSpec) spec).getMessageBuilder())
219: .buildMessage(context);
220: message.getMessageContextInfo().setNode(spec);
221: context.addOutgoingMessage(spec.getName(), message);
222: if (spec.isAcceptResponse()) {
223: Message response = this .brokerManager.publishWithResponse(
224: spec.getChannel(), message);
225: if (response != null) {
226: context.addReceivedMessage(spec.getName(), response);
227: }
228: } else {
229: this .brokerManager.publish(spec.getChannel(), message);
230: }
231: }
232:
233: private void doMessageReceiveNode(MessageReceiveSpec spec)
234: throws MactorException {
235: MessageWaiter mw = messageWaiters.get(spec
236: .getMessageSubscribeNodeName());
237: mw.start(spec.getMaxTimeoutSeconds(),
238: spec.getMinMessageCount(), spec.getMaxMessageCount(),
239: spec.isBlockUntilTimeout());
240: while (true) {
241: IncomingMessage im = null;
242: try {
243: im = mw.getMessage();
244: if (im == null)
245: break;
246: im.getMessage().getMessageContextInfo().setNode(spec);
247: if (spec.hasResponseNode())
248: pushPendingMessage(im);
249: context.addReceivedMessage(spec.getName(), im.message);
250: doNodes(spec.getSpecNodes());
251: im.completed();
252: } catch (Throwable t) {
253: if (im != null)
254: im.canceled();
255: if (t instanceof MactorException)
256: throw (MactorException) t;
257: else
258: throw new MactorException(t);
259: }
260: }
261: }
262:
263: private void doLoopNode(LoopSpec spec) throws MactorException {
264: int count = spec.getCount();
265: if (count == 0)
266: count = Integer.MAX_VALUE;
267: for (int i = 0; i < count; i++) {
268: doNodes(spec.getSpecNodes());
269: }
270: }
271:
272: private void doMessageResponseNode(MessageRespondSpec spec)
273: throws MactorException {
274: Message responseMessage = MessageBuilderCommandExecutorFactory
275: .createExecutor(context,
276: ((MessageRespondSpec) spec).getMessageBuilder())
277: .buildMessage(context);
278: context.addOutgoingMessage(spec.getName(), responseMessage);
279: popPendingMessage().setResponseMessage(responseMessage);
280: }
281:
282: private LinkedList<IncomingMessage> pendingMessages = new LinkedList<IncomingMessage>();
283:
284: private IncomingMessage popPendingMessage() {
285: return pendingMessages.removeLast();
286: }
287:
288: private void pushPendingMessage(IncomingMessage rw) {
289: pendingMessages.addLast(rw);
290: }
291:
292: private void doValueNode(ValueSpec spec) throws MactorException {
293: ValueCommandExecutor vce = ValueCommandExecutorFactory
294: .createExecutor(context, spec);
295: context.setValue(spec.getName(), vce.extractValue(context));
296: }
297:
298: private void doActionNode(ActionSpec spec) throws MactorException {
299: ActionCommandExecutor vce = ActionCommandExecutorFactory
300: .createExecutor(context, spec);
301: vce.perform(context);
302: }
303:
304: public static class IncomingMessage {
305: Message message;
306: Message responseMessage;
307: boolean complete = false;
308:
309: public IncomingMessage(Message message) {
310: this .message = message;
311: }
312:
313: public void setResponseMessage(Message responseMessage) {
314: this .responseMessage = responseMessage;
315: }
316:
317: private Object lock = new Object();
318:
319: public void completed() {
320: synchronized (lock) {
321: complete = true;
322: message.consume();
323: lock.notifyAll();
324: }
325: }
326:
327: public void canceled() {
328: synchronized (lock) {
329: complete = true;
330: lock.notifyAll();
331: }
332: }
333:
334: Message waitForCompletion() throws InterruptedException {
335: synchronized (lock) {
336: if (!complete)
337: lock.wait();
338: }
339: return responseMessage;
340: }
341:
342: public Message getMessage() {
343: return message;
344: }
345: }
346:
347: public static class MessageWaiter implements MessageSubscriber {
348: private LinkedList<IncomingMessage> messages = new LinkedList<IncomingMessage>();
349: private int maxMessageCount;
350: private int minMessageCount;
351: private int maxTimeoutSeconds;
352: private boolean blockUntilTimeout;
353: private Object lock = new Object();
354: private int count;
355: private int returnedCount;
356: private long timeStart;
357: private String channel;
358: private boolean stopped = false;
359: private boolean started = false;
360:
361: public String getChannel() {
362: return channel;
363: }
364:
365: public MessageWaiter(String channel) {
366: this .channel = channel;
367: }
368:
369: public void stop() {
370: synchronized (lock) {
371: stopped = true;
372: }
373: for (IncomingMessage m : messages) {
374: m.canceled();
375: }
376: }
377:
378: public void start(int maxTimeoutSeconds, int minMessageCount,
379: int maxMessageCount, boolean blockUntilTimeout) {
380: /*
381: * if(started) throw new RuntimeException("Invalid state.
382: * MessageWaiter already started"); started = true;
383: */
384: this .returnedCount = 0;
385: this .maxTimeoutSeconds = maxTimeoutSeconds;
386: this .minMessageCount = minMessageCount;
387: this .maxMessageCount = maxMessageCount;
388: this .blockUntilTimeout = blockUntilTimeout;
389: this .timeStart = System.currentTimeMillis();
390: if (maxTimeoutSeconds == 0)
391: this .maxTimeoutSeconds = 60 * 60 * 24 * 365; // a year
392: if (maxMessageCount == 0)
393: this .maxMessageCount = 1000000000; // one billion
394: }
395:
396: public Message onMessage(Message message) {
397: try {
398: IncomingMessage im = new IncomingMessage(message);
399: synchronized (lock) {
400: if (stopped) {
401: lock.notifyAll();
402: return null;
403: }
404: this .messages.add(im);
405: count++;
406: lock.notifyAll();
407: }
408: return im.waitForCompletion();
409: } catch (Exception e) {
410: e.printStackTrace();
411: return null;
412: }
413: }
414:
415: public IncomingMessage getMessage()
416: throws MessageTimoutException {
417: while (true) {
418: if (this .returnedCount >= this .maxMessageCount)
419: return null;
420: if (!blockUntilTimeout
421: && this .returnedCount >= this .minMessageCount)
422: return null;
423: long timeLeft = maxTimeoutSeconds * 1000
424: - (System.currentTimeMillis() - timeStart);
425: if (timeLeft <= 0) {
426: if (this .returnedCount >= this .minMessageCount)
427: return null;
428: else
429: throw new MessageTimoutException(
430: "Timout reached after receiving "
431: + this .returnedCount
432: + " messages");
433: }
434: try {
435: synchronized (lock) {
436: if (messages.size() > 0) {
437: IncomingMessage m = messages.removeLast();
438: returnedCount++;
439: return m;
440: } else if (timeLeft > 0) {
441: lock.wait(timeLeft);
442: }
443: if (messages.size() > 0) {
444: IncomingMessage m = messages.removeLast();
445: returnedCount++;
446: return m;
447: }
448: }
449: } catch (InterruptedException ie) {
450: throw new RuntimeException(ie);
451: }
452: }
453: }
454: }
455:
456: private void sleep(long millis) {
457: try {
458: Thread.sleep(millis);
459: } catch (InterruptedException ie) {
460: }
461: }
462:
463: }
|