001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */package org.apache.geronimo.console.core.jms;
017:
018: import java.util.ArrayList;
019: import java.util.List;
020:
021: import javax.jms.Message;
022: import javax.jms.QueueSession;
023: import javax.jms.Topic;
024: import javax.jms.TopicConnection;
025: import javax.jms.TopicConnectionFactory;
026: import javax.jms.TopicSession;
027: import javax.jms.TopicSubscriber;
028: import javax.management.MalformedObjectNameException;
029: import javax.management.ObjectName;
030:
031: import org.apache.commons.logging.Log;
032: import org.apache.commons.logging.LogFactory;
033: import org.apache.geronimo.connector.AdminObjectWrapper;
034: import org.apache.geronimo.gbean.GBeanInfo;
035: import org.apache.geronimo.gbean.GBeanInfoBuilder;
036: import org.apache.geronimo.gbean.GBeanLifecycle;
037: import org.apache.geronimo.gbean.WaitingException;
038: import org.apache.geronimo.kernel.Kernel;
039: import org.apache.geronimo.kernel.KernelRegistry;
040: import org.apache.geronimo.kernel.management.State;
041:
042: public class TopicBrowserGBean implements GBeanLifecycle, Runnable {
043:
044: private static Log log = LogFactory.getLog(TopicBrowserGBean.class);
045:
046: private static Kernel kernel = KernelRegistry.getSingleKernel();
047:
048: static {
049: try {
050: ACTIVEMQ_CONTAINER_OBJNAME = ObjectName
051: .getInstance("geronimo.server:J2EEApplication=null,J2EEModule=org/apache/geronimo/ActiveMQServer,J2EEServer=geronimo,j2eeType=JMSServer,name=ActiveMQl");
052: ACTIVEMQ_CONNECTOR_OBJNAME = ObjectName
053: .getInstance("geronimo.server:J2EEApplication=null,J2EEServer=geronimo,JCAResource=org/apache/geronimo/SystemJMS,j2eeType=JCAManagedConnectionFactory,name=DefaultActiveMQConnectionFactory");
054: } catch (MalformedObjectNameException moe) {
055: log.warn("Could not initialize ObjectName", moe);
056: }
057: }
058:
059: private static ObjectName ACTIVEMQ_CONTAINER_OBJNAME;
060:
061: private static ObjectName ACTIVEMQ_CONNECTOR_OBJNAME;
062:
063: String subscriberName;
064:
065: TopicConnectionFactory tConFactory;
066:
067: TopicConnection tConnection;
068:
069: AdminObjectWrapper connectionFactoryWrapper, topicWrapper;
070:
071: TopicSession tSession;
072:
073: TopicSubscriber tSubscriber;
074:
075: Topic topic;
076:
077: Thread t;
078:
079: boolean stop;
080:
081: public void run() {
082: try {
083: tConFactory = (TopicConnectionFactory) connectionFactoryWrapper
084: .$getResource();
085: topic = (Topic) topicWrapper.$getResource();
086: tConnection = tConFactory.createTopicConnection();
087: tConnection.setClientID(subscriberName);
088: tSession = tConnection.createTopicSession(false,
089: QueueSession.AUTO_ACKNOWLEDGE);
090: tSubscriber = tSession.createDurableSubscriber(topic,
091: subscriberName);
092: tConnection.start();
093: while (!stop) {
094: Thread.yield();
095: }
096: if (tSession != null) {
097: tSession.close();
098: }
099: if (tConnection != null) {
100: // If the activeMQ connector or container is not running there
101: // is no need to close the connection.
102: // Closing the connection would fail anyway.
103: if (((Integer) kernel.getAttribute(
104: ACTIVEMQ_CONTAINER_OBJNAME, "state"))
105: .intValue() == State.RUNNING_INDEX
106: && ((Integer) kernel.getAttribute(
107: ACTIVEMQ_CONNECTOR_OBJNAME, "state"))
108: .intValue() == State.RUNNING_INDEX) {
109: tConnection.close();
110: }
111: }
112: } catch (Exception e) {
113: throw new RuntimeException(e);
114: }
115: t = null;
116: log.debug("Worker thread stopped.");
117: }
118:
119: public TopicBrowserGBean(String subscriberName,
120: AdminObjectWrapper connectionFactoryWrapper,
121: AdminObjectWrapper topicWrapper) {
122: this .subscriberName = subscriberName + "@"
123: + this .getClass().getName();
124: this .connectionFactoryWrapper = connectionFactoryWrapper;
125: this .topicWrapper = topicWrapper;
126: }
127:
128: /**
129: * Start the connection on a topic and add a durable subscription.
130: *
131: * @see org.apache.geronimo.gbean.GBeanLifecycle#doStart()
132: */
133: public void doStart() throws WaitingException, Exception {
134: t = new Thread(this );
135: t.start();
136: log.debug("Subscribed to topic.");
137: }
138:
139: /**
140: * Close the connection and unregister durable subscription.
141: *
142: * @see org.apache.geronimo.gbean.GBeanLifecycle#doStop()
143: */
144: public void doStop() throws WaitingException, Exception {
145: stop = true;
146: log.debug("Unsubscribed to topic.");
147: }
148:
149: public void doFail() {
150: stop = true;
151: log.warn("GBean failed.");
152: }
153:
154: /**
155: * Get all the messages since the last call to getMessages(). If this is the
156: * first call returns all the messages sent to the Topic
157: *
158: * @return all the messages since the last call to getMessages() or all the
159: * messages sent to the topic if this is there was no previous call.
160: * @throws Exception
161: */
162: public List getMessages() throws Exception {
163: List ret = new ArrayList();
164: Message m = null;
165: do {
166: m = tSubscriber.receiveNoWait();
167: if (m != null) {
168: ret.add(m);
169: }
170: } while (m != null);
171: return ret;
172: }
173:
174: /**
175: * Remove a durable subscription.
176: */
177: public void unsubscribe() throws Exception {
178: if (tSubscriber != null) {
179: tSubscriber.close();
180: if (tSession != null) {
181: tSession.unsubscribe(subscriberName);
182: log.debug(subscriberName + " unsubscribed from Topic "
183: + topic.getTopicName() + ".");
184: }
185: }
186: }
187:
188: public static final GBeanInfo GBEAN_INFO;
189:
190: static {
191: GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(
192: "Topic Browser GBean", TopicBrowserGBean.class);
193: infoFactory.addAttribute("subscriberName", String.class, true);
194:
195: infoFactory.addReference("ConnectionFactoryWrapper",
196: AdminObjectWrapper.class);
197: infoFactory.addReference("TopicWrapper",
198: AdminObjectWrapper.class);
199:
200: infoFactory.addOperation("getMessages");
201: infoFactory.addOperation("unsubscribe");
202:
203: infoFactory.setConstructor(new String[] { "subscriberName",
204: "ConnectionFactoryWrapper", "TopicWrapper" });
205:
206: GBEAN_INFO = infoFactory.getBeanInfo();
207: }
208:
209: public static GBeanInfo getGBeanInfo() {
210: return GBEAN_INFO;
211: }
212:
213: }
|