001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.stress;
023:
024: import junit.framework.TestSuite;
025: import junit.framework.Assert;
026:
027: import org.jboss.test.jbossmq.MQBase;
028:
029: /**
030: * Durable subscriber tests.
031: *
032: * @author <a href="mailto:pra@tim.se">Peter Antman</a>
033: * @version $Revision: 57211 $
034: */
035:
036: public class DurableSubscriberTest extends MQBase {
037:
038: public DurableSubscriberTest(String name) {
039: super (name);
040: }
041:
042: /**
043: * Test setting up a durable subscription. Disconnect after half
044: * the messages have been sent. Connect later to see if they are still there.
045: * This test is done it two parts to be able to take down the server in
046: * between
047: */
048: public void runDurableSubscriberPartOne() throws Exception {
049: try {
050: // Clean testarea up
051: drainTopic();
052:
053: int ic = getIterationCount();
054:
055: // Set up a durable subscriber
056: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
057: javax.jms.Message.class, "DURABLE_NR", 0, ic / 2);
058:
059: TopicWorker sub1 = new TopicWorker(SUBSCRIBER, TRANS_NONE,
060: f1);
061: sub1.setDurable("john", "needle", "sub2");
062: Thread t1 = new Thread(sub1);
063: t1.start();
064:
065: // Publish
066: IntRangeMessageCreator c1 = new IntRangeMessageCreator(
067: "DURABLE_NR", 0);
068: TopicWorker pub1 = new TopicWorker(PUBLISHER, TRANS_NONE,
069: c1, ic / 2);
070: pub1.connect();
071: pub1.publish();
072:
073: Assert.assertEquals(
074: "Publisher did not publish correct number of messages "
075: + pub1.getMessageHandled(), ic / 2, pub1
076: .getMessageHandled());
077:
078: // let sub1 have some time to handle the messages.
079: log.debug("Sleeping for " + ((ic * 10) / 60000)
080: + " minutes");
081: sleep(ic * 10);
082:
083: Assert.assertEquals(
084: "Subscriber did not get correct number of messages "
085: + sub1.getMessageHandled(), ic / 2, sub1
086: .getMessageHandled());
087:
088: // Take down first sub
089: sub1.close();
090: t1.interrupt();
091:
092: //Publish some more
093: pub1.publish(ic / 2);
094: Assert.assertEquals(
095: "Publisher did not publish correct number of messages "
096: + pub1.getMessageHandled(), ic, pub1
097: .getMessageHandled());
098:
099: pub1.close();
100: } catch (Throwable t) {
101: log.error("Error in test: " + t, t);
102: throw new Exception(t.getMessage());
103: }
104: }
105:
106: /**
107: * Part two of durable subscriber test, part one should be run before
108: * this is run.
109: */
110: public void runDurableSubscriberPartTwo() throws Exception {
111: try {
112: int ic = getIterationCount();
113: // Set up a durable subscriber
114: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
115: javax.jms.Message.class, "DURABLE_NR", 0, ic / 2);
116:
117: TopicWorker sub1 = new TopicWorker(SUBSCRIBER, TRANS_NONE,
118: f1);
119: sub1.setDurable("john", "needle", "sub2");
120:
121: // Start up subscription again
122: Thread t2 = new Thread(sub1);
123: t2.start();
124:
125: log.debug("Sleeping for " + ((ic * 10) / 60000)
126: + " minutes");
127: sleep(ic * 10);
128: Assert.assertEquals(
129: "Subscriber did not get correct number of messages "
130: + sub1.getMessageHandled(), ic / 2, sub1
131: .getMessageHandled());
132:
133: //OK, take everything down
134: sub1.unsubscribe();
135: sub1.close();
136: t2.interrupt();
137:
138: } catch (Throwable t) {
139: log.error("Error in test: " + t, t);
140: throw new Exception(t.getMessage());
141: }
142: }
143:
144: public void testDurableSubscriber() throws Exception {
145: runDurableSubscriberPartOne();
146: runDurableSubscriberPartTwo();
147: }
148:
149: public void runGoodClient() throws Exception {
150: TopicWorker sub1 = new TopicWorker(CONNECTOR, TRANS_NONE, null);
151: sub1.setDurable("john", "needle", "sub2");
152: Thread t1 = new Thread(sub1);
153: t1.start();
154: try {
155: Thread.sleep(2000);
156: } catch (InterruptedException e) {
157: }
158: // Take it down abruptly
159: t1.interrupt();
160: sub1.close();
161: Assert.assertNull("Error in connecting durable sub", sub1
162: .getException());
163:
164: }
165:
166: /**
167: * Test connecting as a durable subscriber and diconecction without taking
168: * the connection down properly
169: */
170: public void runBadClient() throws Exception {
171: TopicWorker sub1 = new TopicWorker(CONNECTOR, TRANS_NONE, null);
172: sub1.setDurable("john", "needle", "sub2");
173: Thread t1 = new Thread(sub1);
174: t1.start();
175: try {
176: Thread.sleep(2000);
177: } catch (InterruptedException e) {
178: }
179: // Take it down abruptly
180: t1.interrupt();
181: //sub1.close();
182: Assert.assertNull("Error in connecting durable sub", sub1
183: .getException());
184: }
185:
186: public static junit.framework.Test suite() throws Exception {
187:
188: TestSuite suite = new TestSuite();
189: suite.addTest(new DurableSubscriberTest("runGoodClient"));
190: suite
191: .addTest(new DurableSubscriberTest(
192: "testDurableSubscriber"));
193:
194: //suite.addTest(new DurableSubscriberTest("testBadClient"));
195: return suite;
196: }
197:
198: public static void main(String[] args) {
199:
200: }
201:
202: } // DurableSubscriberTest
|