001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.perf;
023:
024: import java.util.ArrayList;
025: import java.util.Iterator;
026: import javax.jms.Message;
027: import javax.jms.MessageListener;
028: import javax.jms.QueueConnection;
029: import javax.jms.QueueConnectionFactory;
030: import javax.jms.QueueReceiver;
031: import javax.jms.QueueSender;
032: import javax.jms.QueueSession;
033: import javax.jms.Session;
034: import javax.jms.TemporaryQueue;
035: import javax.jms.Topic;
036: import javax.jms.TopicConnection;
037: import javax.jms.TopicConnectionFactory;
038: import javax.jms.TopicPublisher;
039: import javax.jms.TopicSession;
040: import javax.jms.TopicSubscriber;
041: import javax.jms.Queue;
042: import javax.naming.Context;
043:
044: import org.jboss.test.JBossTestCase;
045:
046: /**
047: * JBossMQPerfStressTestCase.java Some simple tests of JBossMQ
048: *
049: * @author
050: * @version
051: */
052: public class SendReplyPerfStressTestCase extends JBossTestCase {
053: // Provider specific
054: static String TOPIC_FACTORY = "ConnectionFactory";
055: static String QUEUE_FACTORY = "ConnectionFactory";
056:
057: static String TEST_QUEUE = "queue/testQueue";
058: static String TEST_TOPIC = "topic/testTopic";
059:
060: static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10];
061:
062: //JMSProviderAdapter providerAdapter;
063: static Context context;
064: static QueueConnection queueConnection;
065: static TopicConnection topicConnection;
066:
067: public SendReplyPerfStressTestCase(String name) throws Exception {
068: super (name);
069: }
070:
071: /**
072: * The main entry-point for the JBossMQPerfStressTestCase class
073: *
074: * @param args The command line arguments
075: */
076: public static void main(String[] args) {
077:
078: String newArgs[] = { "org.jboss.test.jbossmq.perf.SendReplyPerfStressTestCase" };
079: junit.swingui.TestRunner.main(newArgs);
080: }
081:
082: public static class State {
083: public int expected;
084: public int finished = 0;
085: public ArrayList errors = new ArrayList();
086:
087: public State(int expected) {
088: this .expected = expected;
089: }
090:
091: public synchronized void addError(Throwable t) {
092: errors.add(t);
093: }
094:
095: public synchronized void finished() {
096: ++finished;
097: if (finished == expected)
098: notifyAll();
099: }
100:
101: public synchronized void waitForFinish() throws Exception {
102: if (finished == expected)
103: return;
104: wait();
105: }
106: }
107:
108: public static class MessageQueueSender implements Runnable {
109: State state;
110:
111: public MessageQueueSender(State state) {
112: this .state = state;
113: }
114:
115: public void run() {
116: try {
117: Queue queue = (Queue) context.lookup(TEST_QUEUE);
118: QueueSession session = queueConnection
119: .createQueueSession(false,
120: Session.AUTO_ACKNOWLEDGE);
121: TemporaryQueue temp = session.createTemporaryQueue();
122: Message message = session.createTextMessage();
123: message.setJMSReplyTo(temp);
124:
125: QueueSender sender = session.createSender(queue);
126: sender.send(message);
127:
128: QueueReceiver receiver = session.createReceiver(temp);
129: receiver.receive();
130: receiver.close();
131: temp.delete();
132:
133: session.close();
134: } catch (Throwable t) {
135: state.addError(t);
136: } finally {
137: state.finished();
138: }
139: }
140: }
141:
142: public static class MessageTopicSender implements Runnable {
143: State state;
144:
145: public MessageTopicSender(State state) {
146: this .state = state;
147: }
148:
149: public void run() {
150: try {
151: Topic topic = (Topic) context.lookup(TEST_TOPIC);
152: TopicSession session = topicConnection
153: .createTopicSession(false,
154: Session.AUTO_ACKNOWLEDGE);
155: Message message = session.createTextMessage();
156:
157: QueueSession qsession = queueConnection
158: .createQueueSession(false,
159: Session.AUTO_ACKNOWLEDGE);
160: TemporaryQueue temp = qsession.createTemporaryQueue();
161: message.setJMSReplyTo(temp);
162:
163: TopicPublisher publisher = session
164: .createPublisher(topic);
165: publisher.publish(message);
166:
167: QueueReceiver receiver = qsession.createReceiver(temp);
168: receiver.receive();
169: receiver.close();
170:
171: session.close();
172: } catch (Throwable t) {
173: state.addError(t);
174: } finally {
175: state.finished();
176: }
177: }
178: }
179:
180: public static class MessageReplier implements MessageListener {
181: State state;
182:
183: public MessageReplier(State state) {
184: this .state = state;
185: }
186:
187: public void onMessage(Message message) {
188: try {
189: QueueSession session = queueConnection
190: .createQueueSession(false,
191: Session.AUTO_ACKNOWLEDGE);
192: Queue replyQueue = session.createQueue(((Queue) message
193: .getJMSReplyTo()).getQueueName());
194: QueueSender sender = session.createSender(replyQueue);
195: sender.send(message);
196: sender.close();
197: session.close();
198: } catch (Throwable t) {
199: state.addError(t);
200: }
201: }
202: }
203:
204: public void testSendReplyQueue() throws Exception {
205: drainQueue();
206:
207: // Set up the workers
208: State state = new State(getThreadCount());
209: MessageReplier replier = new MessageReplier(state);
210: Thread[] threads = new Thread[getThreadCount()];
211: for (int i = 0; i < threads.length; ++i)
212: threads[i] = new Thread(new MessageQueueSender(state));
213:
214: // Register the message listener
215: Queue queue = (Queue) context.lookup(TEST_QUEUE);
216: QueueSession session = queueConnection.createQueueSession(
217: false, Session.AUTO_ACKNOWLEDGE);
218: QueueReceiver receiver = session.createReceiver(queue);
219: receiver.setMessageListener(replier);
220: queueConnection.start();
221:
222: // Start the senders
223: for (int i = 0; i < threads.length; ++i)
224: threads[i].start();
225:
226: // Wait for it to finish
227: state.waitForFinish();
228:
229: // Report the result
230: for (Iterator i = state.errors.iterator(); i.hasNext();)
231: getLog().error("Error", (Throwable) i.next());
232: if (state.errors.size() > 0)
233: throw new RuntimeException("Test failed with "
234: + state.errors.size() + " errors");
235: }
236:
237: public void testSendReplyTopic() throws Exception {
238: // Set up the workers
239: State state = new State(getThreadCount());
240: MessageReplier replier = new MessageReplier(state);
241:
242: Thread[] threads = new Thread[getThreadCount()];
243: for (int i = 0; i < threads.length; ++i)
244: threads[i] = new Thread(new MessageTopicSender(state));
245:
246: // Register the message listener
247: Topic topic = (Topic) context.lookup(TEST_TOPIC);
248: TopicSession session = topicConnection.createTopicSession(
249: false, Session.AUTO_ACKNOWLEDGE);
250: TopicSubscriber subscriber = session.createSubscriber(topic);
251: subscriber.setMessageListener(replier);
252: topicConnection.start();
253: queueConnection.start();
254:
255: // Start the senders
256: for (int i = 0; i < threads.length; ++i)
257: threads[i].start();
258:
259: // Wait for it to finish
260: state.waitForFinish();
261:
262: // Report the result
263: for (Iterator i = state.errors.iterator(); i.hasNext();)
264: getLog().error("Error", (Throwable) i.next());
265: if (state.errors.size() > 0)
266: throw new RuntimeException("Test failed with "
267: + state.errors.size() + " errors");
268: }
269:
270: protected void setUp() throws Exception {
271: getLog().info("Starting test: " + getName());
272:
273: context = getInitialContext();
274:
275: QueueConnectionFactory queueFactory = (QueueConnectionFactory) context
276: .lookup(QUEUE_FACTORY);
277: queueConnection = queueFactory.createQueueConnection();
278:
279: TopicConnectionFactory topicFactory = (TopicConnectionFactory) context
280: .lookup(TOPIC_FACTORY);
281: topicConnection = topicFactory.createTopicConnection();
282:
283: getLog().debug("Connection to JBossMQ established.");
284: }
285:
286: protected void tearDown() throws Exception {
287: getLog().info("Ended test: " + getName());
288: queueConnection.close();
289: topicConnection.close();
290: }
291:
292: private void drainQueue() throws Exception {
293: QueueSession session = queueConnection.createQueueSession(
294: false, Session.AUTO_ACKNOWLEDGE);
295: Queue queue = (Queue) context.lookup(TEST_QUEUE);
296:
297: QueueReceiver receiver = session.createReceiver(queue);
298: queueConnection.start();
299: Message message = receiver.receive(50);
300: int c = 0;
301: while (message != null) {
302: message = receiver.receive(50);
303: c++;
304: }
305:
306: if (c != 0)
307: getLog().debug(
308: " Drained " + c + " messages from the queue");
309: session.close();
310: queueConnection.stop();
311:
312: }
313: }
|