001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.perf;
023:
024: import javax.jms.DeliveryMode;
025: import javax.jms.Message;
026: import javax.jms.Queue;
027: import javax.jms.QueueConnection;
028: import javax.jms.QueueConnectionFactory;
029: import javax.jms.QueueReceiver;
030: import javax.jms.QueueSender;
031: import javax.jms.QueueSession;
032: import javax.jms.Session;
033: import javax.jms.TopicConnection;
034: import javax.jms.TopicConnectionFactory;
035: import javax.management.ObjectName;
036: import javax.naming.Context;
037: import javax.naming.InitialContext;
038:
039: import org.apache.log4j.Category;
040: import org.jboss.test.JBossTestCase;
041:
042: import EDU.oswego.cs.dl.util.concurrent.Semaphore;
043:
044: /**
045: * JBossMQPerfStressTestCase.java Some simple tests of JBossMQ
046: *
047: * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
048: * @version $Revision: 57211 $
049: */
050:
051: public class InvocationLayerStressTest extends JBossTestCase {
052: Context context;
053: QueueConnection queueConnection;
054: TopicConnection topicConnection;
055: static final int WORKER_COUNT = Integer.parseInt(System
056: .getProperty("jbosstest.threadcount", "10"));
057: static final int MESSAGE_COUNT = Integer.parseInt(System
058: .getProperty("jbosstest.iterationcount", "500"));
059: Semaphore exitSemaphore;
060:
061: /**
062: * Constructor for the JBossMQPerfStressTestCase object
063: *
064: * @param name Description of Parameter
065: * @exception Exception Description of Exception
066: */
067: public InvocationLayerStressTest(String name) throws Exception {
068: super (name);
069: }
070:
071: public void createQueue(String name) {
072: try {
073: ObjectName objn = new ObjectName(
074: "jboss.mq:service=DestinationManager");
075: getServer().invoke(
076: objn,
077: "createQueue",
078: new Object[] { name, name },
079: new String[] { String.class.getName(),
080: String.class.getName() });
081: } catch (Exception e) {
082: e.printStackTrace();
083: }
084: }
085:
086: public void createTopic(String name) {
087: try {
088: ObjectName objn = new ObjectName(
089: "jboss.mq:service=DestinationManager");
090: getServer().invoke(
091: objn,
092: "createTopic",
093: new Object[] { name, name },
094: new String[] { String.class.getName(),
095: String.class.getName() });
096: } catch (Exception e) {
097: e.printStackTrace();
098: }
099: }
100:
101: public void deleteQueue(String name) {
102: try {
103: ObjectName objn = new ObjectName(
104: "jboss.mq:service=DestinationManager");
105: getServer().invoke(objn, "destroyQueue",
106: new Object[] { name },
107: new String[] { String.class.getName() });
108: } catch (Exception e) {
109: e.printStackTrace();
110: }
111: }
112:
113: public void deleteTopic(String name) {
114: try {
115: ObjectName objn = new ObjectName(
116: "jboss.mq:service=DestinationManager");
117: getServer().invoke(objn, "destroyTopic",
118: new Object[] { name },
119: new String[] { String.class.getName() });
120: } catch (Exception e) {
121: e.printStackTrace();
122: }
123: }
124:
125: protected void connect(String queueLoc, String topicLoc)
126: throws Exception {
127: context = new InitialContext();
128: QueueConnectionFactory queueFactory = (QueueConnectionFactory) context
129: .lookup(queueLoc);
130: queueConnection = queueFactory.createQueueConnection();
131:
132: TopicConnectionFactory topicFactory = (TopicConnectionFactory) context
133: .lookup(topicLoc);
134: topicConnection = topicFactory.createTopicConnection();
135: }
136:
137: protected void disconnect() throws Exception {
138: queueConnection.close();
139: topicConnection.close();
140: }
141:
142: class QueueWorker extends Thread {
143: String queueName;
144: Throwable exception;
145: Object signal = new Object();
146: Category log = Category.getInstance(QueueWorker.class);
147:
148: QueueWorker(String queueName, String ilType) {
149: super (queueName);
150: this .queueName = queueName;
151: this .log = Category.getInstance("QueueWorker." + queueName
152: + "." + ilType);
153: }
154:
155: public void run() {
156: log.info("QueueWorker Running: " + queueName);
157:
158: try {
159: work();
160: } catch (Throwable e) {
161: exception = e;
162: log.error("Exception:", e);
163: }
164:
165: // Signal the main thread that we are done.
166: log.debug("Notifying main thread: ");
167: exitSemaphore.release();
168:
169: log.info("QueueWorker Done: " + queueName);
170: }
171:
172: void work() throws Exception {
173: createQueue(queueName);
174: QueueSession session = queueConnection.createQueueSession(
175: false, Session.AUTO_ACKNOWLEDGE);
176: Queue queue = (Queue) context.lookup(queueName);
177:
178: // Send the messages
179: QueueSender sender = session.createSender(queue);
180: sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
181: Message message = session.createTextMessage("Test Message");
182: for (int i = 0; i < MESSAGE_COUNT; i++) {
183: sender.send(message);
184: log.debug("Sent message " + i + " to queue :"
185: + queueName);
186: }
187:
188: // Receive the messages
189: QueueReceiver receiver = session.createReceiver(queue);
190: for (int i = 0; i < MESSAGE_COUNT; i++) {
191: message = receiver.receive(5000);
192: log.debug("Received message " + i + " from queue :"
193: + queueName);
194: if (message == null)
195: fail("Received of msg timedout");
196: }
197: session.close();
198: deleteQueue(queueName);
199: }
200:
201: }
202: }
|