001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.stress;
023:
024: import javax.jms.Message;
025: import javax.jms.BytesMessage;
026: import javax.jms.JMSException;
027:
028: import junit.framework.TestSuite;
029: import junit.framework.Assert;
030:
031: import org.jboss.test.jbossmq.MQBase;
032:
033: /**
034: * @author <a href="mailto:pra@tim.se">Peter Antman</a>
035: * @version $Revision: 57211 $
036: */
037:
038: public class MassiveTest extends MQBase {
039: static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024];
040:
041: public MassiveTest(String name) {
042: super (name);
043: }
044:
045: /**
046: * Should be run with large iteration count!!!!!
047: */
048: public void runMassiveTest() throws Exception {
049: // Clean testarea up
050: drainTopic();
051:
052: int ic = getIterationCount();
053:
054: // Set up a durable subscriber
055: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
056: javax.jms.BytesMessage.class, "MASSIVE_NR", 0, ic);
057:
058: TopicWorker sub1 = new TopicWorker(SUBSCRIBER, TRANS_NONE, f1);
059: Thread t1 = new Thread(sub1);
060: t1.start();
061:
062: // Publish
063: ByteIntRangeMessageCreator c1 = new ByteIntRangeMessageCreator(
064: "MASSIVE_NR", 0);
065: TopicWorker pub1 = new TopicWorker(PUBLISHER, TRANS_NONE, c1,
066: ic);
067: pub1.connect();
068: pub1.publish();
069:
070: Assert.assertEquals(
071: "Publisher did not publish correct number of messages "
072: + pub1.getMessageHandled(), ic, pub1
073: .getMessageHandled());
074:
075: log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
076: // let sub1 have some time to handle the messages.
077: try {
078: Thread.sleep(ic * 10);
079: } catch (InterruptedException e) {
080: }
081: log.debug("Woke up");
082: Assert.assertEquals(
083: "Subscriber did not get correct number of messages "
084: + sub1.getMessageHandled(), ic, sub1
085: .getMessageHandled());
086:
087: // Take down first sub
088: sub1.close();
089: t1.interrupt();
090: pub1.close();
091: }
092:
093: public void runMassivTestFailingSub() throws Exception {
094: // Clean testarea up
095: drainTopic();
096:
097: int ic = getIterationCount();
098:
099: // Set up a subscriber
100: IntRangeMessageFilter f1 = new IntRangeMessageFilter(
101: javax.jms.BytesMessage.class, "MASSIVE_NR", 0, ic);
102:
103: TopicWorker sub1 = new TopicWorker(SUBSCRIBER, TRANS_NONE, f1);
104: Thread t1 = new Thread(sub1);
105: t1.start();
106:
107: // Set up a failing sub
108: FailingSubWorker sub2 = new FailingSubWorker();
109: sub2.setSubscriberAttrs(SUBSCRIBER, TRANS_NONE, f1);
110: Thread tf = new Thread(sub2);
111: tf.start();
112:
113: // Publish
114: ByteIntRangeMessageCreator c1 = new ByteIntRangeMessageCreator(
115: "MASSIVE_NR", 0);
116: TopicWorker pub1 = new TopicWorker(PUBLISHER, TRANS_NONE, c1,
117: ic);
118: pub1.connect();
119: pub1.publish();
120:
121: Assert.assertEquals(
122: "Publisher did not publish correct number of messages "
123: + pub1.getMessageHandled(), ic, pub1
124: .getMessageHandled());
125:
126: log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
127: // let sub1 have some time to handle the messages.
128: try {
129: Thread.sleep(ic * 10);
130: } catch (InterruptedException e) {
131: }
132: log.debug("Woke up");
133: Assert.assertEquals(
134: "Subscriber did not get correct number of messages "
135: + sub1.getMessageHandled(), ic, sub1
136: .getMessageHandled());
137:
138: // Take down
139: sub1.close();
140: t1.interrupt();
141: pub1.close();
142: sub2.setStoped();
143: tf.interrupt();
144: tf.interrupt();
145: sub2.close();
146: }
147:
148: public static junit.framework.Test suite() throws Exception {
149:
150: TestSuite suite = new TestSuite();
151: suite.addTest(new MassiveTest("runMassiveTest"));
152:
153: //suite.addTest(new DurableSubscriberTest("testBadClient"));
154: return suite;
155: }
156:
157: public static void main(String[] args) {
158: try {
159: MassiveTest t = new MassiveTest("runMassiveTest");
160: t.setUp();
161: t.runMassiveTest();
162: } catch (Exception ex) {
163: System.err.println("Ex: " + ex);
164: ex.printStackTrace();
165: }
166:
167: }
168:
169: public class ByteIntRangeMessageCreator extends
170: IntRangeMessageCreator {
171: int start = 0;
172:
173: public ByteIntRangeMessageCreator(String property) {
174: super (property);
175: }
176:
177: public ByteIntRangeMessageCreator(String property, int start) {
178: super (property);
179: this .start = start;
180: }
181:
182: public Message createMessage(int nr) throws JMSException {
183: if (session == null)
184: throw new JMSException("Session not allowed to be null");
185:
186: BytesMessage msg = session.createBytesMessage();
187: msg.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
188: msg.setStringProperty(property, String.valueOf(start + nr));
189: return msg;
190: }
191: }
192:
193: public class FailingSubWorker extends TopicWorker {
194: int check = 0;
195:
196: //Only reveice firts message
197: public void onMessage(Message msg) {
198: check++;
199: if (check > 1)
200: log.warn("Got called while asleep!! " + check);
201: while (!stopRequested) {
202: sleep(2000);
203: }
204: }
205: }
206: } // MassiveTest
|