001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013: * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014: * License for the specific language governing permissions and limitations
015: * under the License.
016: *
017: */
018:
019: package org.apache.jmeter.protocol.jms.sampler;
020:
021: import java.util.Date;
022: import java.util.Enumeration;
023: import java.util.Hashtable;
024: import java.util.Iterator;
025: import java.util.Map;
026:
027: import javax.jms.DeliveryMode;
028: import javax.jms.JMSException;
029: import javax.jms.Message;
030: import javax.jms.Queue;
031: import javax.jms.QueueConnection;
032: import javax.jms.QueueConnectionFactory;
033: import javax.jms.QueueSender;
034: import javax.jms.QueueSession;
035: import javax.jms.Session;
036: import javax.jms.TextMessage;
037: import javax.naming.Context;
038: import javax.naming.InitialContext;
039: import javax.naming.NamingException;
040:
041: import org.apache.jmeter.config.Arguments;
042: import org.apache.jmeter.engine.event.LoopIterationEvent;
043: import org.apache.jmeter.samplers.AbstractSampler;
044: import org.apache.jmeter.samplers.Entry;
045: import org.apache.jmeter.samplers.SampleResult;
046: import org.apache.jmeter.testelement.ThreadListener;
047: import org.apache.jmeter.testelement.property.BooleanProperty;
048: import org.apache.jmeter.testelement.property.TestElementProperty;
049: import org.apache.jorphan.logging.LoggingManager;
050: import org.apache.log.Logger;
051:
052: /**
053: * Sampler for JMS Communication. <br>
054: * Created on: October 28, 2004
055: *
056: */
057: public class JMSSampler extends AbstractSampler implements
058: ThreadListener {
059:
060: private static final Logger LOGGER = LoggingManager
061: .getLoggerForClass();
062:
063: private static final int DEFAULT_TIMEOUT = 2000;
064:
065: //++ These are JMX names, and must not be changed
066: private static final String JNDI_INITIAL_CONTEXT_FACTORY = "JMSSampler.initialContextFactory"; // $NON-NLS-1$
067:
068: private static final String JNDI_CONTEXT_PROVIDER_URL = "JMSSampler.contextProviderUrl"; // $NON-NLS-1$
069:
070: private static final String JNDI_PROPERTIES = "JMSSampler.jndiProperties"; // $NON-NLS-1$
071:
072: private static final String TIMEOUT = "JMSSampler.timeout"; // $NON-NLS-1$
073:
074: private static final String IS_ONE_WAY = "JMSSampler.isFireAndForget"; // $NON-NLS-1$
075:
076: private static final String JMS_PROPERTIES = "arguments"; // $NON-NLS-1$
077:
078: private static final String RECEIVE_QUEUE = "JMSSampler.ReceiveQueue"; // $NON-NLS-1$
079:
080: private static final String XML_DATA = "HTTPSamper.xml_data"; // $NON-NLS-1$
081:
082: private final static String SEND_QUEUE = "JMSSampler.SendQueue"; // $NON-NLS-1$
083:
084: private final static String QUEUE_CONNECTION_FACTORY_JNDI = "JMSSampler.queueconnectionfactory"; // $NON-NLS-1$
085:
086: private static final String IS_NON_PERSISTENT = "JMSSampler.isNonPersistent"; // $NON-NLS-1$
087:
088: //--
089:
090: //
091: // Member variables
092: //
093: /** Factory for the connections to the queueing system. */
094: // NOTUSED private QueueConnectionFactory factory;
095: /** Queue for receiving messages (if applicable). */
096: private transient Queue receiveQueue;
097:
098: /** The session with the queueing system. */
099: private transient QueueSession session;
100:
101: /** Connection to the queueing system. */
102: private transient QueueConnection connection;
103:
104: /** Queue for sending messages. */
105: private transient Queue sendQueue;
106:
107: /** Is the communication oneway? */
108: // NOTUSED private boolean oneway;
109: /** The executor for (pseudo) synchronous communication. */
110: private transient QueueExecutor executor;
111:
112: /** Producer of the messages. */
113: private transient QueueSender producer;
114:
115: private transient Receiver receiverThread = null;
116:
117: /*
118: * (non-Javadoc)
119: *
120: * @see org.apache.jmeter.samplers.Sampler#sample(org.apache.jmeter.samplers.Entry)
121: */
122: public SampleResult sample(Entry entry) {
123: SampleResult res = new SampleResult();
124: res.setSampleLabel(getName());
125: res.setSamplerData(getContent());
126: res.setDataType(SampleResult.TEXT);
127: res.sampleStart();
128:
129: try {
130: TextMessage msg = createMessage();
131:
132: if (isOneway()) {
133: producer.send(msg);
134: res.setSuccessful(true);
135: res
136: .setResponseData("Oneway request has no response data"
137: .getBytes());
138: } else {
139: if (!useTemporyQueue()) {
140: msg.setJMSReplyTo(receiveQueue);
141: }
142:
143: Message replyMsg = executor.sendAndReceive(msg);
144: if (replyMsg == null) {
145: res.setSuccessful(false);
146: if (LOGGER.isDebugEnabled()) {
147: LOGGER.debug("No reply message received");
148: }
149: } else {
150: if (replyMsg instanceof TextMessage) {
151: res.setResponseData(((TextMessage) replyMsg)
152: .getText().getBytes());
153: } else {
154: res.setResponseData(replyMsg.toString()
155: .getBytes());
156: }
157: res.setSuccessful(true);
158: }
159: }
160: } catch (Exception e) {
161: LOGGER.warn(e.getLocalizedMessage(), e);
162: res.setResponseData(new byte[0]);
163: res.setSuccessful(false);
164: }
165: res.sampleEnd();
166: return res;
167: }
168:
169: private TextMessage createMessage() throws JMSException {
170: if (session == null) {
171: throw new IllegalStateException(
172: "Session may not be null while creating message");
173: }
174: TextMessage msg = session.createTextMessage();
175: msg.setText(getContent());
176: addJMSProperties(msg);
177: return msg;
178: }
179:
180: private void addJMSProperties(TextMessage msg) throws JMSException {
181: Map map = getArguments(JMSSampler.JMS_PROPERTIES)
182: .getArgumentsAsMap();
183: Iterator argIt = map.entrySet().iterator();
184: while (argIt.hasNext()) {
185: Map.Entry me = (Map.Entry) argIt.next();
186: String name = (String) me.getKey();
187: String value = (String) me.getValue();
188: if (LOGGER.isDebugEnabled()) {
189: LOGGER.debug("Adding property [" + name + "=" + value
190: + "]");
191: }
192: msg.setStringProperty(name, value);
193: }
194: }
195:
196: public Arguments getJMSProperties() {
197: return getArguments(JMSSampler.JMS_PROPERTIES);
198: }
199:
200: public void setJMSProperties(Arguments args) {
201: setProperty(new TestElementProperty(JMSSampler.JMS_PROPERTIES,
202: args));
203: }
204:
205: public Arguments getJNDIProperties() {
206: return getArguments(JMSSampler.JNDI_PROPERTIES);
207: }
208:
209: public void setJNDIProperties(Arguments args) {
210: setProperty(new TestElementProperty(JMSSampler.JNDI_PROPERTIES,
211: args));
212: }
213:
214: public String getQueueConnectionFactory() {
215: return getPropertyAsString(QUEUE_CONNECTION_FACTORY_JNDI);
216: }
217:
218: public void setQueueConnectionFactory(String qcf) {
219: setProperty(QUEUE_CONNECTION_FACTORY_JNDI, qcf);
220: }
221:
222: public String getSendQueue() {
223: return getPropertyAsString(SEND_QUEUE);
224: }
225:
226: public void setSendQueue(String name) {
227: setProperty(SEND_QUEUE, name);
228: }
229:
230: public String getReceiveQueue() {
231: return getPropertyAsString(RECEIVE_QUEUE);
232: }
233:
234: public void setReceiveQueue(String name) {
235: setProperty(RECEIVE_QUEUE, name);
236: }
237:
238: public String getContent() {
239: return getPropertyAsString(XML_DATA);
240: }
241:
242: public void setContent(String content) {
243: setProperty(XML_DATA, content);
244: }
245:
246: public boolean isOneway() {
247: return getPropertyAsBoolean(IS_ONE_WAY);
248: }
249:
250: public boolean isNonPersistent() {
251: return getPropertyAsBoolean(IS_NON_PERSISTENT);
252: }
253:
254: public String getInitialContextFactory() {
255: return getPropertyAsString(JMSSampler.JNDI_INITIAL_CONTEXT_FACTORY);
256: }
257:
258: public String getContextProvider() {
259: return getPropertyAsString(JMSSampler.JNDI_CONTEXT_PROVIDER_URL);
260: }
261:
262: public void setIsOneway(boolean isOneway) {
263: setProperty(new BooleanProperty(IS_ONE_WAY, isOneway));
264: }
265:
266: public void setNonPersistent(boolean value) {
267: setProperty(new BooleanProperty(IS_NON_PERSISTENT, value));
268: }
269:
270: public String toString() {
271: return getQueueConnectionFactory() + ", queue: "
272: + getSendQueue();
273: }
274:
275: public synchronized void testStarted() {
276: LOGGER.debug("testStarted, thread: "
277: + Thread.currentThread().getName());
278:
279: }
280:
281: public synchronized void testEnded() {
282: LOGGER.debug("testEndded(), thread: "
283: + Thread.currentThread().getName());
284: }
285:
286: public void testIterationStart(LoopIterationEvent event) {
287: // LOGGER.debug("testIterationStart");
288: }
289:
290: public void threadStarted() {
291: logThreadStart();
292:
293: Context context = null;
294: try {
295: context = getInitialContext();
296: Object obj = context.lookup(getQueueConnectionFactory());
297: if (!(obj instanceof QueueConnectionFactory)) {
298: String msg = "QueueConnectionFactory expected, but got "
299: + obj.getClass().getName();
300: LOGGER.fatalError(msg);
301: throw new IllegalStateException(msg);
302: }
303: QueueConnectionFactory factory = (QueueConnectionFactory) obj;
304: Queue queue = (Queue) context.lookup(getSendQueue());
305:
306: sendQueue = queue;
307: if (!useTemporyQueue()) {
308: receiveQueue = (Queue) context
309: .lookup(getReceiveQueue());
310: receiverThread = Receiver.createReceiver(factory,
311: receiveQueue);
312: }
313:
314: connection = factory.createQueueConnection();
315:
316: session = connection.createQueueSession(false,
317: Session.AUTO_ACKNOWLEDGE);
318:
319: if (LOGGER.isDebugEnabled()) {
320: LOGGER.debug("Session created");
321: }
322:
323: if (getPropertyAsBoolean(IS_ONE_WAY)) {
324: producer = session.createSender(sendQueue);
325: if (isNonPersistent()) {
326: producer
327: .setDeliveryMode(DeliveryMode.NON_PERSISTENT);
328: }
329: } else {
330:
331: if (useTemporyQueue()) {
332: executor = new TemporaryQueueExecutor(session,
333: sendQueue);
334: } else {
335: producer = session.createSender(sendQueue);
336: if (isNonPersistent()) {
337: producer
338: .setDeliveryMode(DeliveryMode.NON_PERSISTENT);
339: }
340: executor = new FixedQueueExecutor(producer,
341: getTimeout());
342: }
343: }
344: if (LOGGER.isDebugEnabled()) {
345: LOGGER.debug("Starting connection");
346: }
347:
348: connection.start();
349:
350: if (LOGGER.isDebugEnabled()) {
351: LOGGER.debug("Connection started");
352: }
353: } catch (JMSException e) {
354: LOGGER.warn(e.getLocalizedMessage(), e);
355: } catch (NamingException e) {
356: LOGGER.warn(e.getLocalizedMessage(), e);
357: } finally {
358: if (context != null) {
359: try {
360: context.close();
361: } catch (NamingException e1) {
362: // ignore
363: }
364: }
365: }
366: }
367:
368: private Context getInitialContext() throws NamingException {
369: Hashtable table = new Hashtable();
370:
371: if (getInitialContextFactory() != null
372: && getInitialContextFactory().trim().length() > 0) {
373: if (LOGGER.isDebugEnabled())
374: LOGGER.debug("Using InitialContext ["
375: + getInitialContextFactory() + "]");
376: table.put(Context.INITIAL_CONTEXT_FACTORY,
377: getInitialContextFactory());
378: }
379: if (getContextProvider() != null
380: && getContextProvider().trim().length() > 0) {
381: if (LOGGER.isDebugEnabled())
382: LOGGER.debug("Using Provider [" + getContextProvider()
383: + "]");
384: table.put(Context.PROVIDER_URL, getContextProvider());
385: }
386: Map map = getArguments(JMSSampler.JNDI_PROPERTIES)
387: .getArgumentsAsMap();
388: if (LOGGER.isDebugEnabled()) {
389: if (map.isEmpty()) {
390: LOGGER.debug("Empty JNDI properties");
391: } else {
392: LOGGER
393: .debug("Number of JNDI properties: "
394: + map.size());
395: }
396: }
397: Iterator it = map.keySet().iterator();
398: while (it.hasNext()) {
399: String key = (String) it.next();
400: table.put(key, map.get(key));
401: }
402:
403: Context context = new InitialContext(table);
404: if (LOGGER.isDebugEnabled()) {
405: printEnvironment(context);
406: }
407: return context;
408: }
409:
410: private void printEnvironment(Context context)
411: throws NamingException {
412: Hashtable env = context.getEnvironment();
413: LOGGER.debug("Initial Context Properties");
414: Enumeration keys = env.keys();
415: while (keys.hasMoreElements()) {
416: String key = (String) keys.nextElement();
417: LOGGER.debug(key + "=" + env.get(key));
418: }
419: }
420:
421: private void logThreadStart() {
422: if (LOGGER.isDebugEnabled()) {
423: LOGGER.debug("Thread started " + new Date());
424: LOGGER.debug("JMSSampler: ["
425: + Thread.currentThread().getName()
426: + "], hashCode=[" + hashCode() + "]");
427: LOGGER.debug("QCF: [" + getQueueConnectionFactory()
428: + "], sendQueue=[" + getSendQueue() + "]");
429: LOGGER.debug("Timeout = " + getTimeout() + "]");
430: LOGGER.debug("Use temporary queue =" + useTemporyQueue()
431: + "]");
432: LOGGER.debug("Reply queue =" + getReceiveQueue()
433: + "]");
434: }
435: }
436:
437: public int getTimeout() {
438: if (getPropertyAsInt(TIMEOUT) < 1) {
439: return DEFAULT_TIMEOUT;
440: }
441: return getPropertyAsInt(TIMEOUT);
442: }
443:
444: /*
445: * (non-Javadoc)
446: *
447: * @see org.apache.jmeter.testelement.TestElement#threadFinished()
448: */
449: public void threadFinished() {
450: LOGGER.debug("Thread ended " + new Date());
451:
452: if (session != null)
453: try {
454: session.close();
455: } catch (JMSException e) {
456: LOGGER.info(e.getLocalizedMessage());
457:
458: }
459: if (connection != null)
460: try {
461: connection.close();
462: } catch (JMSException e) {
463: LOGGER.info(e.getLocalizedMessage());
464: }
465: if (receiverThread != null)
466: receiverThread.deactivate();
467: }
468:
469: private boolean useTemporyQueue() {
470: String recvQueue = getReceiveQueue();
471: return recvQueue == null || recvQueue.trim().length() == 0;
472: }
473:
474: public void setArguments(Arguments args) {
475: setProperty(new TestElementProperty(JMSSampler.JMS_PROPERTIES,
476: args));
477: }
478:
479: public Arguments getArguments(String name) {
480: return (Arguments) getProperty(name).getObjectValue();
481: }
482:
483: public void setTimeout(String s) {
484: setProperty(JMSSampler.TIMEOUT, s);
485: }
486:
487: /**
488: * @param string
489: */
490: public void setInitialContextFactory(String string) {
491: setProperty(JNDI_INITIAL_CONTEXT_FACTORY, string);
492:
493: }
494:
495: /**
496: * @param string
497: */
498: public void setContextProvider(String string) {
499: setProperty(JNDI_CONTEXT_PROVIDER_URL, string);
500:
501: }
502:
503: }
|