001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.stress;
023:
024: import javax.jms.JMSException;
025:
026: import junit.framework.Assert;
027:
028: import org.jboss.test.jbossmq.MQBase;
029:
030: /**
031: * According to JMS spec, 6.11.1 to have several durable subscriptions under
032: * one client id. We test for this here.
033: *
034: * @author <a href="mailto:pra@tim.se">Peter Antman</a>
035: * @version $Revision: 57211 $
036: */
037:
038: public class MultipleDurableSubscribers extends MQBase {
039: class PiggyBackWorker extends TopicWorker {
040: public PiggyBackWorker(TopicWorker worker) {
041: super ();
042: connection = worker.connection;
043: destination = worker.destination;
044: session = worker.session;
045: }
046:
047: public void connect() {
048: log.debug("In null connect");
049: // Does nothing
050: }
051:
052: public void subscribe() throws JMSException {
053: super .subscribe();
054: log.debug("Message consumer set up " + consumer);
055: }
056: }
057:
058: public MultipleDurableSubscribers(String name) {
059: super (name);
060: }
061:
062: // This is build the same way as Durable test, to make it possible to take
063: // server down in between.
064: /**
065: * Test setting up a durable subscription. Disconnect after half
066: * the messages have been sent. Connect later to see if they are still there.
067: * This test is done it two parts to be able to take down the server in
068: * between
069: */
070: public void runDurableSubscriberPartOne() throws Exception {
071: try {
072: // Clean testarea up
073: drainTopic();
074:
075: int ic = getIterationCount();
076:
077: // Set up a durable subscriber
078: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
079: javax.jms.Message.class, "DURABLE_NR", 0, ic / 2);
080:
081: TopicWorker sub1 = new TopicWorker(SUBSCRIBER, TRANS_NONE,
082: f1);
083: sub1.setDurable("john", "needle", "sub1");
084: Thread t1 = new Thread(sub1);
085: t1.start();
086:
087: log.debug("Sub1 set up");
088: sleep(5000L);
089: TopicWorker sub2 = new PiggyBackWorker(sub1);
090: sub2.setSubscriberAttrs(SUBSCRIBER, TRANS_NONE, f1);
091: sub2.setDurable("john", "needle", "sub2");
092: Thread t2 = new Thread(sub2);
093: t2.start();
094: log.debug("Sub2 setup");
095:
096: // Publish
097: IntRangeMessageCreator c1 = new IntRangeMessageCreator(
098: "DURABLE_NR", 0);
099: TopicWorker pub1 = new TopicWorker(PUBLISHER, TRANS_NONE,
100: c1, ic / 2);
101: pub1.connect();
102: pub1.publish();
103:
104: Assert.assertEquals(
105: "Publisher did not publish correct number of messages "
106: + pub1.getMessageHandled(), ic / 2, pub1
107: .getMessageHandled());
108:
109: // let sub1 have some time to handle the messages.
110: log.debug("Sleeping for " + ((ic * 10) / 60000)
111: + " minutes");
112: sleep(ic * 10);
113:
114: Assert.assertEquals(
115: "Subscriber1 did not get correct number of messages "
116: + sub1.getMessageHandled(), ic / 2, sub1
117: .getMessageHandled());
118: Assert.assertEquals(
119: "Subscriber2 did not get correct number of messages "
120: + sub1.getMessageHandled(), ic / 2, sub2
121: .getMessageHandled());
122:
123: // Take down subs
124: sub1.close();
125: t1.interrupt();
126: sub2.close();
127: t2.interrupt();
128:
129: //Publish some more
130: pub1.publish(ic / 2);
131: Assert.assertEquals(
132: "Publisher did not publish correct number of messages "
133: + pub1.getMessageHandled(), ic, pub1
134: .getMessageHandled());
135:
136: pub1.close();
137: } catch (Throwable t) {
138: log.error("Error in test: " + t, t);
139: throw new Exception(t.getMessage());
140: }
141: }
142:
143: /**
144: * Part two of durable subscriber test, part one should be run before
145: * this is run.
146: */
147: public void runDurableSubscriberPartTwo() throws Exception {
148: try {
149: int ic = getIterationCount();
150: // Set up a durable subscriber
151: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
152: javax.jms.Message.class, "DURABLE_NR", 0, ic / 2);
153:
154: TopicWorker sub1 = new TopicWorker(SUBSCRIBER, TRANS_NONE,
155: f1);
156: sub1.setDurable("john", "needle", "sub1");
157:
158: // Start up subscription again
159: Thread t1 = new Thread(sub1);
160: t1.start();
161: sleep(5000L);
162: TopicWorker sub2 = new PiggyBackWorker(sub1);
163: sub2.setSubscriberAttrs(SUBSCRIBER, TRANS_NONE, f1);
164: sub2.setDurable("john", "needle", "sub2");
165: Thread t2 = new Thread(sub2);
166: t2.start();
167:
168: log.debug("Sleeping for " + ((ic * 10) / 60000)
169: + " minutes");
170: sleep(ic * 10);
171: Assert.assertEquals(
172: "Subscriber did not get correct number of messages "
173: + sub1.getMessageHandled(), ic / 2, sub1
174: .getMessageHandled());
175: Assert.assertEquals(
176: "Subscriber did not get correct number of messages "
177: + sub1.getMessageHandled(), ic / 2, sub2
178: .getMessageHandled());
179:
180: //OK, take everything down
181: sub1.unsubscribe();
182: sub2.unsubscribe();
183: sub1.close();
184: t1.interrupt();
185: sub2.close();
186: t2.interrupt();
187: } catch (Throwable t) {
188: log.error("Error in test: " + t, t);
189: throw new Exception(t.getMessage());
190: }
191: }
192:
193: public void testDurableSubscriber() throws Exception {
194: runDurableSubscriberPartOne();
195: runDurableSubscriberPartTwo();
196: }
197:
198: public static void main(String[] args) {
199:
200: }
201:
202: } // MultipleDurableSubscribers
|