001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.stress;
023:
024: import junit.framework.TestSuite;
025: import junit.framework.Assert;
026:
027: import org.jboss.test.jbossmq.MQBase;
028:
029: /**
030: * Test queue recover.
031: *
032: * @author <a href="mailto:pra@tim.se">Peter Antman</a>
033: * @version $Revision: 57211 $
034: */
035:
036: public class QueueTest extends MQBase {
037:
038: public QueueTest(String name) {
039: super (name);
040: }
041:
042: /**
043: * This test is done it two parts to be able to take down the server in
044: * between
045: */
046: public void runQueueSubscriberPartOne() throws Exception {
047: try {
048: // Clean testarea up
049: drainQueue();
050:
051: int ic = getIterationCount();
052:
053: // Set up a durable subscriber
054: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
055: javax.jms.Message.class, "QUEUE_NR", 0, ic / 2);
056:
057: QueueWorker sub1 = new QueueWorker(SUBSCRIBER, TRANS_NONE,
058: f1);
059: Thread t1 = new Thread(sub1);
060: t1.start();
061:
062: // Publish
063: IntRangeMessageCreator c1 = new IntRangeMessageCreator(
064: "QUEUE_NR", 0);
065: QueueWorker pub1 = new QueueWorker(PUBLISHER, TRANS_NONE,
066: c1, ic / 2);
067: pub1.connect();
068: pub1.publish();
069:
070: Assert.assertEquals(
071: "Publisher did not publish correct number of messages "
072: + pub1.getMessageHandled(), ic / 2, pub1
073: .getMessageHandled());
074:
075: // let sub1 have some time to handle the messages.
076: log.debug("Sleeping for " + ((ic * 10) / 60000)
077: + " minutes");
078: // let sub1 have some time to handle the messages.
079: sleep(ic * 10);
080:
081: Assert.assertEquals(
082: "Subscriber did not get correct number of messages "
083: + sub1.getMessageHandled(), ic / 2, sub1
084: .getMessageHandled());
085:
086: // Take down first sub
087: sub1.close();
088: t1.interrupt();
089:
090: //Publish some more
091: pub1.publish(ic / 2);
092: Assert.assertEquals(
093: "Publisher did not publish correct number of messages "
094: + pub1.getMessageHandled(), ic, pub1
095: .getMessageHandled());
096:
097: pub1.close();
098: } catch (Throwable t) {
099: log.error("Error in test: " + t, t);
100: throw new Exception(t.getMessage());
101: }
102: }
103:
104: /**
105: * Part two of durable subscriber test, part one should be run before
106: * this is run.
107: */
108: public void runQueueSubscriberPartTwo() throws Exception {
109: try {
110: int ic = getIterationCount();
111: // Set up a durable subscriber
112: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
113: javax.jms.Message.class, "QUEUE_NR", 0, ic / 2);
114:
115: QueueWorker sub1 = new QueueWorker(SUBSCRIBER, TRANS_NONE,
116: f1);
117:
118: // Start up subscription again
119: Thread t2 = new Thread(sub1);
120: t2.start();
121:
122: log.debug("Sleeping for " + ((ic * 10) / 60000)
123: + " minutes");
124: sleep(ic * 10);
125: Assert.assertEquals(
126: "Subscriber did not get correct number of messages "
127: + sub1.getMessageHandled(), ic / 2, sub1
128: .getMessageHandled());
129:
130: //OK, take everything down
131: sub1.close();
132: t2.interrupt();
133:
134: } catch (Throwable t) {
135: log.error("Error in test: " + t, t);
136: throw new Exception(t.getMessage());
137: }
138: }
139:
140: /**
141: * Test queue without taking the server down.
142: */
143: public void testQueueSubscriber() throws Exception {
144: runQueueSubscriberPartOne();
145: runQueueSubscriberPartTwo();
146: }
147:
148: public static junit.framework.Test suite() throws Exception {
149:
150: TestSuite suite = new TestSuite();
151: suite.addTest(new QueueSubOne("testQueueSubscriber"));
152:
153: return suite;
154: }
155:
156: public static void main(String[] args) {
157:
158: }
159:
160: } // QueueTest
|