001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.cocoon.components.jms;
018:
019: import java.util.Hashtable;
020: import java.util.Iterator;
021: import java.util.LinkedList;
022: import java.util.List;
023:
024: import javax.jms.JMSException;
025: import javax.jms.MessageListener;
026: import javax.jms.Session;
027: import javax.jms.Topic;
028: import javax.jms.TopicConnection;
029: import javax.jms.TopicConnectionFactory;
030: import javax.jms.TopicPublisher;
031: import javax.jms.TopicSession;
032: import javax.jms.TopicSubscriber;
033: import javax.naming.Context;
034: import javax.naming.InitialContext;
035: import javax.naming.NamingException;
036:
037: import org.apache.avalon.framework.CascadingException;
038: import org.apache.avalon.framework.activity.Disposable;
039: import org.apache.avalon.framework.activity.Initializable;
040: import org.apache.avalon.framework.configuration.Configurable;
041: import org.apache.avalon.framework.configuration.Configuration;
042: import org.apache.avalon.framework.configuration.ConfigurationException;
043: import org.apache.avalon.framework.logger.AbstractLogEnabled;
044: import org.apache.avalon.framework.parameters.Parameters;
045: import org.apache.avalon.framework.thread.ThreadSafe;
046:
047: /**
048: * JMSConnection properties container plus utilities.
049: *
050: * <p>Configuration:</p>
051: * <pre><jndi-info>
052: * <parameter name="" value=""/>
053: * <jndi-info></pre>
054: *
055: * <p>Other parameters:</p>
056: * <table>
057: * <tbody>
058: * <tr><td>topic-factory </td><td><i>(required, no default)</i></td></tr>
059: * <tr><td>topic </td><td><i>(required, no default)</i></td></tr>
060: * <tr><td>ack-mode </td><td>("dups")</td></tr>
061: * <tr><td>durable-subscription-id </td><td><i>(optional)</i></td></tr>
062: * </tbody>
063: * </table>
064: *
065: * @version CVS $Id: JMSConnectionImpl.java 433543 2006-08-22 06:22:54Z crossley $
066: * @author <a href="mailto:haul@apache.org">haul</a>
067: * @deprecated use {@link org.apache.cocoon.components.jms.JMSConnectionManager} instead
068: */
069: public class JMSConnectionImpl extends AbstractLogEnabled implements
070: Configurable, Disposable, ThreadSafe, Initializable,
071: JMSConnection {
072:
073: private boolean available = false;
074: protected String topicFactoryName;
075: protected String topicName;
076: protected String ackModeName = "dups";
077: protected String durableSubscriptionID;
078:
079: protected TopicConnection connection = null;
080: protected TopicSession session = null;
081: protected List subscribers = null;
082: protected Topic topic = null;
083: protected int ackMode = Session.DUPS_OK_ACKNOWLEDGE;
084: protected Context context = null;
085: protected TopicConnectionFactory topicConnectionFactory;
086:
087: private Parameters jndiParams;
088:
089: public void configure(Configuration conf)
090: throws ConfigurationException {
091: Parameters parameters = Parameters.fromConfiguration(conf);
092: this .jndiParams = Parameters.fromConfiguration(conf
093: .getChild("jndi-info"));
094: this .topicFactoryName = parameters.getParameter(
095: "topic-factory", null);
096: this .topicName = parameters.getParameter("topic", null);
097: this .durableSubscriptionID = parameters.getParameter(
098: "durable-subscription-id", null);
099:
100: this .ackModeName = parameters.getParameter("ack-mode",
101: this .ackModeName).toLowerCase();
102: // see if an ack mode has been specified. If it hasn't
103: // then assume CLIENT_ACKNOWLEDGE mode.
104: this .ackMode = Session.CLIENT_ACKNOWLEDGE;
105: if (this .ackModeName.equals("auto")) {
106: this .ackMode = Session.AUTO_ACKNOWLEDGE;
107: } else if (this .ackModeName.equals("dups")) {
108: this .ackMode = Session.DUPS_OK_ACKNOWLEDGE;
109: } else if (!this .ackModeName.equals("client")) {
110: // ignore all ack modes, to test no acking
111: this .ackMode = -1;
112: }
113: }
114:
115: /* (non-Javadoc)
116: * @see org.apache.avalon.framework.activity.Initializable#initialize()
117: */
118: public void initialize() throws Exception {
119: try {
120: this .context = setupContext();
121: this .setupConnection();
122: this .setupSession();
123: this .available = true;
124: } catch (NamingException e) {
125: if (getLogger().isWarnEnabled()) {
126: Throwable rootCause = e.getRootCause();
127: if (rootCause != null) {
128: String message = e.getRootCause().getMessage();
129: if (rootCause instanceof ClassNotFoundException) {
130: String info = "WARN! *** JMS block is installed but jms client library not found. ***\n"
131: + "- For the jms block to work you must install and start a JMS server and "
132: + "place the client jar in WEB-INF/lib.";
133: if (message.indexOf("exolab") > 0) {
134: info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon.";
135: }
136: System.err.println(info);
137: getLogger().warn(info, e);
138: } else {
139: System.out.println(message);
140: getLogger()
141: .warn(
142: "Cannot get Initial Context. Is the JNDI server reachable?",
143: e);
144: }
145: } else {
146: getLogger().warn("Failed to initialize JMS.", e);
147: }
148: }
149: } catch (JMSException e) {
150: if (getLogger().isWarnEnabled()) {
151: getLogger().warn("Failed to initialize JMS.", e);
152: }
153: }
154: }
155:
156: /*
157: * @see org.apache.avalon.framework.activity.Disposable#dispose()
158: */
159: public void dispose() {
160: try {
161: this .disconnect();
162: } catch (JMSException e) {
163: } catch (NamingException e) {
164: }
165: }
166:
167: /**
168: * Register a new TopicListener for this connection.
169: *
170: * @param listener
171: * @param selector
172: */
173: public synchronized void registerListener(MessageListener listener,
174: String selector) throws CascadingException, JMSException,
175: NamingException {
176:
177: if (!this .available) {
178: // Connection was not successfully initialized.
179: throw new CascadingException(
180: "Attempt to register Listener on unavailable JMS Connection");
181: }
182:
183: TopicSubscriber subscriber = null;
184: if (this .durableSubscriptionID != null) {
185: subscriber = this .getSession().createDurableSubscriber(
186: this .topic, this .durableSubscriptionID, selector,
187: false);
188: } else {
189: subscriber = this .getSession().createSubscriber(this .topic,
190: selector, false);
191: }
192: if (this .subscribers == null) {
193: this .subscribers = new LinkedList();
194: }
195: this .subscribers.add(subscriber);
196:
197: subscriber.setMessageListener(listener);
198: }
199:
200: /**
201: * Get a new TopicPublisher for this connection.
202: *
203: * @return TopicPublisher
204: * @throws JMSException
205: * @throws NamingException
206: */
207: public TopicPublisher getPublisher() throws JMSException,
208: NamingException {
209: TopicSession session = this .getSession();
210: if (session != null) {
211: return session.createPublisher(this .topic);
212: } else {
213: return null;
214: }
215: }
216:
217: /**
218: * Get the session associated with this connection. This is needed for example to
219: * create messages.
220: *
221: * @return session associated with this connection
222: * @throws NamingException
223: * @throws JMSException
224: */
225: public TopicSession getSession() throws NamingException,
226: JMSException {
227: return this .session;
228: }
229:
230: /**
231: * Get initial context.
232: *
233: * @return initial context
234: * @throws NamingException
235: */
236: protected Context setupContext() throws NamingException {
237:
238: String[] jndiKeys = jndiParams.getNames();
239: InitialContext ctx;
240: if (jndiKeys.length > 0) {
241: // Params specified in cocoon.xconf
242: Hashtable properties = null;
243: properties = new Hashtable();
244: for (int i = 0; i < jndiKeys.length; i++) {
245: properties.put(jndiKeys[i], jndiParams.getParameter(
246: jndiKeys[i], ""));
247: }
248: ctx = new InitialContext(properties);
249: } else {
250: // Use jndi.properties from the classpath or container
251: ctx = new InitialContext();
252: }
253: return ctx;
254: }
255:
256: /**
257: * Setup connection.
258: *
259: * @throws NamingException
260: * @throws JMSException
261: */
262: private void setupConnection() throws NamingException, JMSException {
263: // setup JMS connection
264: //this.context = this.getContext();
265: if (this .context != null) {
266: this .topicConnectionFactory = (TopicConnectionFactory) this .context
267: .lookup(this .topicFactoryName);
268: this .connection = this .topicConnectionFactory
269: .createTopicConnection();
270: this .connection.start();
271: }
272: }
273:
274: /**
275: * Setup session for this connection.
276: *
277: * @throws JMSException
278: */
279: private void setupSession() throws JMSException {
280: if (this .connection != null) {
281: this .session = connection.createTopicSession(false,
282: this .ackMode);
283: this .topic = session.createTopic(this .topicName);
284: }
285: }
286:
287: /**
288: * Disconnect session and connection, close all subscribers.
289: *
290: * @throws JMSException
291: * @throws NamingException
292: */
293: private void disconnect() throws JMSException, NamingException {
294: if (this .subscribers != null) {
295: for (Iterator i = this .subscribers.iterator(); i.hasNext();) {
296: ((TopicSubscriber) i.next()).close();
297: }
298: this.subscribers.clear();
299: }
300: if (this.session != null) {
301: this.session.close();
302: }
303: if (this.connection != null) {
304: this.connection.close();
305: }
306: }
307:
308: }
|