001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: *
017: */
018:
019: package org.apache.jmeter.protocol.jms.client;
020:
021: import javax.jms.JMSException;
022: import javax.jms.Message;
023: import javax.jms.TextMessage;
024: import javax.jms.Topic;
025: import javax.jms.TopicConnection;
026: import javax.jms.TopicSession;
027: import javax.jms.TopicSubscriber;
028: import javax.naming.Context;
029: import javax.naming.InitialContext;
030: import javax.naming.NamingException;
031:
032: import org.apache.jorphan.logging.LoggingManager;
033: import org.apache.log.Logger;
034:
035: public class ReceiveSubscriber implements Runnable {
036:
037: private static Logger log = LoggingManager.getLoggerForClass();
038:
039: private TopicConnection CONN = null;
040:
041: private TopicSession SESSION = null;
042:
043: private Topic TOPIC = null;
044:
045: private TopicSubscriber SUBSCRIBER = null;
046:
047: private byte[] RESULT = null;
048:
049: private int counter;
050:
051: private int loop = 1; // TODO never read
052:
053: private StringBuffer buffer = new StringBuffer();
054:
055: private volatile boolean RUN = true;
056: // Needs to be volatile to ensure value is picked up
057:
058: private Thread CLIENTTHREAD = null;
059:
060: /**
061: *
062: */
063: public ReceiveSubscriber() {
064: super ();
065: }
066:
067: public ReceiveSubscriber(boolean useProps, String jndi, String url,
068: String connfactory, String topic, String useAuth,
069: String user, String pwd) {
070: Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd);
071: if (ctx != null) {
072: initConnection(ctx, connfactory, topic);
073: } else {
074: log
075: .error("Could not initialize JNDI Initial Context Factory");
076: }
077: }
078:
079: /**
080: * Initialize the JNDI initial context
081: *
082: * @param useProps
083: * @param jndi
084: * @param url
085: * @param useAuth
086: * @param user
087: * @param pwd
088: * @return the JNDI initial context or null
089: */
090: public Context initJNDI(boolean useProps, String jndi, String url,
091: String useAuth, String user, String pwd) {
092: if (useProps) {
093: try {
094: return new InitialContext();
095: } catch (NamingException e) {
096: log.error(e.getMessage());
097: return null;
098: }
099: } else {
100: return InitialContextFactory.lookupContext(jndi, url,
101: useAuth, user, pwd);
102: }
103: }
104:
105: /**
106: * Create the connection, session and topic subscriber
107: *
108: * @param ctx
109: * @param connfactory
110: * @param topic
111: */
112: public void initConnection(Context ctx, String connfactory,
113: String topic) {
114: try {
115: ConnectionFactory.getTopicConnectionFactory(ctx,
116: connfactory);
117: this .CONN = ConnectionFactory.getTopicConnection();
118: this .TOPIC = InitialContextFactory.lookupTopic(ctx, topic);
119: this .SESSION = this .CONN.createTopicSession(false,
120: TopicSession.AUTO_ACKNOWLEDGE);
121: this .SUBSCRIBER = this .SESSION.createSubscriber(this .TOPIC);
122: log.info("created the topic connection successfully");
123: } catch (JMSException e) {
124: log.error("Connection error: " + e.getMessage());
125: }
126: }
127:
128: /**
129: * Set the number of iterations for each call to sample()
130: *
131: * @param loop
132: */
133: public void setLoop(int loop) {
134: this .loop = loop;
135: }
136:
137: /**
138: * Resume will call Connection.start() and begin receiving messages from the
139: * JMS provider.
140: */
141: public void resume() {
142: if (this .CONN == null) {
143: log.error("Connection not set up");
144: return;
145: }
146: try {
147: this .CONN.start();
148: } catch (JMSException e) {
149: log.error("failed to start recieving");
150: }
151: }
152:
153: /**
154: * Get the message as a string
155: *
156: */
157: public String getMessage() {
158: return this .buffer.toString();
159: }
160:
161: /**
162: * Get the message(s) as an array of byte[]
163: *
164: */
165: public byte[] getByteResult() {
166: if (this .buffer.length() > 0) {
167: this .RESULT = this .buffer.toString().getBytes();
168: }
169: return this .RESULT;
170: }
171:
172: /**
173: * close() will stop the connection first. Then it closes the subscriber,
174: * session and connection and sets them to null.
175: */
176: public synchronized void close() {
177: try {
178: this .CONN.stop();
179: this .SUBSCRIBER.close();
180: this .SESSION.close();
181: this .CONN.close();
182: this .SUBSCRIBER = null;
183: this .SESSION = null;
184: this .CONN = null;
185: this .RUN = false;
186: this .CLIENTTHREAD.interrupt();
187: this .CLIENTTHREAD = null;
188: this .buffer.setLength(0);
189: this .buffer = null;
190: } catch (JMSException e) {
191: log.error(e.getMessage());
192: } catch (Throwable e) {
193: log.error(e.getMessage());
194: }
195: }
196:
197: /**
198: * Clear will set the buffer to zero and the result objects to null. Clear
199: * should be called at the end of a sample.
200: */
201: public void clear() {
202: this .buffer.setLength(0);
203: this .RESULT = null;
204: }
205:
206: /**
207: * Increment the count and return the new value
208: *
209: * @param increment
210: */
211: public synchronized int count(int increment) {
212: counter += increment;
213: return counter;
214: }
215:
216: /**
217: * Reset will reset the counter and prepare for the next sample() call.
218: *
219: */
220: public synchronized int resetCount() {
221: counter = 0;
222: return counter;
223: }
224:
225: /**
226: * start will create a new thread and pass this class. once the thread is
227: * created, it calls Thread.start().
228: */
229: public void start() {
230: this .CLIENTTHREAD = new Thread(this , "Subscriber2");
231: this .CLIENTTHREAD.start();
232: }
233:
234: /**
235: * run calls listen to begin listening for inboud messages from the
236: * provider.
237: */
238: public void run() {
239: ReceiveSubscriber.this .listen();
240: }
241:
242: /**
243: * Listen for inbound messages
244: */
245: protected void listen() {
246: log.info("Subscriber2.listen() called");
247: while (RUN) {
248: if (SUBSCRIBER == null) {
249: log.error("Subscriber has not been set up");
250: break;
251: }
252: try {
253: Message message = this .SUBSCRIBER.receive();
254: if (message != null && message instanceof TextMessage) {
255: TextMessage msg = (TextMessage) message;
256: if (msg.getText().trim().length() > 0) {
257: this .buffer.append(msg.getText());
258: count(1);
259: }
260: }
261: } catch (JMSException e) {
262: log.info("Communication error: " + e.getMessage());
263: }
264: }
265: }
266: }
|