001: /******************************************************************************
002: * Copyright (C) Lars Ivar Almli. All rights reserved. *
003: * ---------------------------------------------------------------------------*
004: * This file is part of MActor. *
005: * *
006: * MActor is free software; you can redistribute it and/or modify *
007: * it under the terms of the GNU General Public License as published by *
008: * the Free Software Foundation; either version 2 of the License, or *
009: * (at your option) any later version. *
010: * *
011: * MActor is distributed in the hope that it will be useful, *
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of *
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
014: * GNU General Public License for more details. *
015: * *
016: * You should have received a copy of the GNU General Public License *
017: * along with MActor; if not, write to the Free Software *
018: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *
019: ******************************************************************************/package org.mactor.tests;
020:
021: import junit.framework.Assert;
022: import org.junit.Test;
023: import org.mactor.brokers.Message;
024: import org.mactor.brokers.MessageSubscriber;
025: import org.mactor.framework.MessageTimoutException;
026: import org.mactor.framework.TestSpecRunner.IncomingMessage;
027: import org.mactor.framework.TestSpecRunner.MessageWaiter;
028:
029: public class MessageWaiterTest {
030: public void messageWaiter_timeout() throws Exception {
031: MessageWaiter mw = new MessageWaiter(null);
032: long start = System.currentTimeMillis();
033: mw.start(2, 1, 1, false);
034: try {
035: IncomingMessage im = mw.getMessage();
036: Assert.fail();
037: } catch (MessageTimoutException te) {
038: Assert.assertEquals(2,
039: (System.currentTimeMillis() - start) / 1000);
040: }
041: }
042:
043: public void messageWaiter_maxcount() throws Exception {
044: MessageWaiter mw = new MessageWaiter(null);
045: long start = System.currentTimeMillis();
046: mw.start(4, 1, 3, true);
047: MP publisher = asyncMessagePublish(mw, 3, 0);
048: IncomingMessage im = mw.getMessage();
049: assertMessageContent(im.getMessage(), 0);
050: im.completed();
051: im = mw.getMessage();
052: assertMessageContent(im.getMessage(), 1);
053: im.completed();
054: im = mw.getMessage();
055: assertMessageContent(im.getMessage(), 2);
056: im.completed();
057: Assert.assertNull(mw.getMessage());
058: Assert.assertEquals(0,
059: (System.currentTimeMillis() - start) / 1000);
060: Thread.sleep(500);
061: Assert.assertTrue(publisher.isComplete());
062: }
063:
064: private void assertMessageContent(Message m, int index)
065: throws Exception {
066: Assert.assertEquals(index, Integer.parseInt(m
067: .getContentDocument().getRootElement().getText()));
068: }
069:
070: public void messageWaiter_mincount() throws Exception {
071: MessageWaiter mw = new MessageWaiter(null);
072: long start = System.currentTimeMillis();
073: mw.start(4, 2, 3, false);
074: MP publisher = asyncMessagePublish(mw, 2, 0);
075: IncomingMessage im = mw.getMessage();
076: assertMessageContent(im.getMessage(), 0);
077: im.completed();
078: im = mw.getMessage();
079: assertMessageContent(im.getMessage(), 1);
080: im.completed();
081: Assert.assertNull(mw.getMessage());
082: Thread.sleep(500);
083: Assert.assertTrue(publisher.isComplete());
084: }
085:
086: @Test
087: public void messageWaiter_cleanup() throws Exception {
088: MessageWaiter mw = new MessageWaiter(null);
089: long start = System.currentTimeMillis();
090: mw.start(4, 2, 3, false);
091: MP publisher = asyncMessagePublish(mw, 2, 0);
092: IncomingMessage im = mw.getMessage();
093: assertMessageContent(im.getMessage(), 0);
094: im.canceled();
095: mw.stop();
096: Thread.sleep(500);
097: Assert.assertTrue(publisher.isComplete());
098: }
099:
100: public void messageWaiter_block() throws Exception {
101: long start = System.currentTimeMillis();
102: try {
103: MessageWaiter mw = new MessageWaiter(null);
104: mw.start(4, 2, 0, true);
105: mw.onMessage(Message.createMessage("<a></a>"));
106: Assert.assertNotNull(mw.getMessage());
107: mw.getMessage();
108: } catch (MessageTimoutException te) {
109: Assert.assertEquals(4,
110: (System.currentTimeMillis() - start) / 1000);
111: }
112: }
113:
114: public void messageWaiter_noblock() throws Exception {
115: long start = System.currentTimeMillis();
116: MessageWaiter mw = new MessageWaiter(null);
117: mw.start(10, 2, 0, false);
118: mw.onMessage(Message.createMessage("<a></a>"));
119: mw.onMessage(Message.createMessage("<a></a>"));
120: Assert.assertNotNull(mw.getMessage());
121: Assert.assertNotNull(mw.getMessage());
122: Assert.assertEquals(0,
123: (System.currentTimeMillis() - start) / 1000);
124: }
125:
126: private MP asyncMessagePublish(MessageSubscriber ms, int count,
127: int delaySeconds) {
128: MP mp = new MP(ms, count, delaySeconds);
129: new Thread(mp).start();
130: return mp;
131: }
132:
133: private static class MP implements Runnable {
134: MessageSubscriber ms;
135: int count;
136: int delaySeconds;
137: boolean complete = false;
138:
139: public MP(MessageSubscriber ms, int count, int delaySeconds) {
140: super ();
141: this .ms = ms;
142: this .count = count;
143: this .delaySeconds = delaySeconds;
144: }
145:
146: public void run() {
147: for (int i = 0; i < count; i++) {
148: try {
149: if (delaySeconds > 0)
150: Thread.sleep(delaySeconds * 1000);
151: Message m = Message.createMessage("<m>" + i
152: + "</m>");
153: System.out.println("delivering:" + m.getContent());
154: ms.onMessage(m);
155: } catch (Exception e) {
156: e.printStackTrace();
157: }
158: }
159: complete = true;
160: System.out.println("asyncMessagePublish completed ");
161: }
162:
163: public boolean isComplete() {
164: return complete;
165: }
166: }
167: }
|