001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: *
017: */
018:
019: package org.apache.jmeter.protocol.jms.client;
020:
021: import javax.naming.Context;
022: import javax.naming.InitialContext;
023: import javax.naming.NamingException;
024: import javax.jms.JMSException;
025: import javax.jms.MessageListener;
026: import javax.jms.Topic;
027: import javax.jms.TopicConnection;
028: import javax.jms.TopicSession;
029: import javax.jms.TopicSubscriber;
030:
031: import org.apache.jorphan.logging.LoggingManager;
032: import org.apache.log.Logger;
033:
034: /**
035: * @author pete
036: *
037: * OnMessageSubscriber is designed to create the connection, session and
038: * subscriber. The sampler is responsible for implementing
039: * javax.jms.MessageListener interface and onMessage(Message msg) method.
040: *
041: * The implementation provides a close() method to clean up the client at the
042: * end of a test. This is important to make sure there aren't any zombie threads
043: * or odd memory leaks.
044: */
045: public class OnMessageSubscriber {
046:
047: static Logger log = LoggingManager.getLoggerForClass();
048:
049: private TopicConnection CONN = null;
050:
051: private TopicSession SESSION = null;
052:
053: private Topic TOPIC = null;
054:
055: private TopicSubscriber SUBSCRIBER = null;
056:
057: /**
058: *
059: */
060: public OnMessageSubscriber() {
061: super ();
062: }
063:
064: /**
065: * Constructor takes the necessary JNDI related parameters to create a
066: * connection and begin receiving messages.
067: *
068: * @param useProps
069: * @param jndi
070: * @param url
071: * @param connfactory
072: * @param topic
073: * @param useAuth
074: * @param user
075: * @param pwd
076: */
077: public OnMessageSubscriber(boolean useProps, String jndi,
078: String url, String connfactory, String topic,
079: String useAuth, String user, String pwd) {
080: Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd);
081: if (ctx != null) {
082: initConnection(ctx, connfactory, topic);
083: } else {
084: log
085: .error("Could not initialize JNDI Initial Context Factory");
086: }
087: }
088:
089: /**
090: * initialize the JNDI intial context
091: *
092: * @param useProps
093: * @param jndi
094: * @param url
095: * @param useAuth
096: * @param user
097: * @param pwd
098: * @return the context or null
099: */
100: public Context initJNDI(boolean useProps, String jndi, String url,
101: String useAuth, String user, String pwd) {
102: if (useProps) {
103: try {
104: return new InitialContext();
105: } catch (NamingException e) {
106: log.error(e.getMessage());
107: return null;
108: }
109: } else {
110: return InitialContextFactory.lookupContext(jndi, url,
111: useAuth, user, pwd);
112: }
113: }
114:
115: /**
116: * Initialize the connection, session and subscriber
117: *
118: * @param ctx
119: * @param connfactory
120: * @param topic
121: */
122: public void initConnection(Context ctx, String connfactory,
123: String topic) {
124: try {
125: this .CONN = ConnectionFactory.getTopicConnection();
126: this .TOPIC = InitialContextFactory.lookupTopic(ctx, topic);
127: this .SESSION = this .CONN.createTopicSession(false,
128: TopicSession.AUTO_ACKNOWLEDGE);
129: this .SUBSCRIBER = this .SESSION.createSubscriber(this .TOPIC);
130: log.info("created the topic connection successfully");
131: } catch (JMSException e) {
132: log.error("Connection error: " + e.getMessage());
133: }
134: }
135:
136: /**
137: * resume will call Connection.start() to begin receiving inbound messages.
138: */
139: public void resume() {
140: try {
141: this .CONN.start();
142: } catch (JMSException e) {
143: log.error("failed to start recieving");
144: }
145: }
146:
147: /**
148: * close will close all the objects and set them to null.
149: */
150: public void close() {
151: try {
152: log.info("Subscriber closed");
153: this .SUBSCRIBER.close();
154: this .SESSION.close();
155: this .CONN.close();
156: this .SUBSCRIBER = null;
157: this .SESSION = null;
158: this .CONN = null;
159: } catch (JMSException e) {
160: log.error(e.getMessage());
161: } catch (Throwable e) {
162: log.error(e.getMessage());
163: }
164: }
165:
166: /**
167: * The sample uses this method to set itself as the listener. That means the
168: * sampler need to implement MessageListener interface.
169: *
170: * @param listener
171: */
172: public void setMessageListener(MessageListener listener) {
173: try {
174: this .SUBSCRIBER.setMessageListener(listener);
175: } catch (JMSException e) {
176: log.error(e.getMessage());
177: }
178: }
179: }
|