001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: *
017: */
018:
019: package org.apache.jmeter.protocol.jms.sampler;
020:
021: import org.apache.jorphan.logging.LoggingManager;
022: import org.apache.log.Logger;
023:
024: import javax.jms.*;
025:
026: /**
027: * Receiver of pseudo-synchronous reply messages.
028: *
029: * @author Martijn Blankestijn
030: * @version $Id: Receiver.java 493789 2007-01-07 18:10:21Z sebb $.
031: */
032: public class Receiver implements Runnable {
033: private boolean active;
034:
035: private QueueSession session;
036:
037: private QueueReceiver consumer;
038:
039: private QueueConnection conn;
040:
041: // private static Receiver receiver;
042: static Logger log = LoggingManager.getLoggerForClass();
043:
044: private Receiver(QueueConnectionFactory factory, Queue receiveQueue)
045: throws JMSException {
046: conn = factory.createQueueConnection();
047: session = conn.createQueueSession(false,
048: Session.AUTO_ACKNOWLEDGE);
049: consumer = session.createReceiver(receiveQueue);
050: if (log.isDebugEnabled()) {
051: log.debug("Receiver - ctor. Starting connection now");
052: }
053: conn.start();
054: if (log.isInfoEnabled()) {
055: log
056: .info("Receiver - ctor. Connection to messaging system established");
057: }
058: }
059:
060: public static synchronized Receiver createReceiver(
061: QueueConnectionFactory factory, Queue receiveQueue)
062: throws JMSException {
063: // if (receiver == null) {
064: Receiver receiver = new Receiver(factory, receiveQueue);
065: Thread thread = new Thread(receiver);
066: thread.start();
067: // }
068: return receiver;
069: }
070:
071: public void run() {
072: activate();
073: Message reply;
074:
075: while (isActive()) {
076: reply = null;
077: try {
078: reply = consumer.receive(5000);
079: if (reply != null) {
080:
081: if (log.isDebugEnabled()) {
082: log.debug("Received message, correlation id:"
083: + reply.getJMSCorrelationID());
084: }
085:
086: if (reply.getJMSCorrelationID() == null) {
087: log
088: .warn("Received message with correlation id null. Discarding message ...");
089: } else {
090: MessageAdmin.getAdmin().putReply(
091: reply.getJMSCorrelationID(), reply);
092: }
093: }
094:
095: } catch (JMSException e1) {
096: e1.printStackTrace();
097: }
098: }
099: // not active anymore
100: if (session != null) {
101: try {
102: session.close();
103: if (conn != null)
104: conn.close();
105: } catch (JMSException e) {
106: e.printStackTrace();
107: }
108: }
109: }
110:
111: public synchronized void activate() {
112: active = true;
113: }
114:
115: public synchronized void deactivate() {
116: active = false;
117: }
118:
119: private synchronized boolean isActive() {
120: return active;
121: }
122:
123: }
|